From b2f1e21a3bc1689fa144084de96a1ccb7de5484a Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 10 Sep 2021 09:53:30 +0800 Subject: [PATCH] [Bugs] Fix some bugs (#6586) * fix regex lazy * fix result file core * fix dynamic partition replica and table name length bug * fix replicanum 0 * fix delete bug * renew proxy Co-authored-by: morningman --- be/src/common/utils.h | 8 ++++ be/src/exec/base_scanner.cpp | 1 + be/src/exec/base_scanner.h | 2 - be/src/exec/hash_join_node.h | 7 --- be/src/exec/hash_join_node_ir.cpp | 1 + be/src/exec/olap_scanner.cpp | 6 +++ be/src/exprs/string_functions.cpp | 3 +- be/src/runtime/file_result_writer.h | 5 ++- be/src/runtime/fragment_mgr.cpp | 4 +- be/src/runtime/result_file_sink.h | 2 +- .../doris/alter/SchemaChangeHandler.java | 8 ++-- .../org/apache/doris/catalog/Catalog.java | 10 ++++- .../apache/doris/clone/TabletSchedCtx.java | 14 ++++-- .../apache/doris/clone/TabletScheduler.java | 10 ++--- .../java/org/apache/doris/common/Config.java | 10 +++++ .../org/apache/doris/common/FeConstants.java | 3 -- .../org/apache/doris/common/FeNameFormat.java | 2 +- .../common/util/DynamicPartitionUtil.java | 7 +++ .../doris/journal/bdbje/BDBEnvironment.java | 2 +- .../org/apache/doris/load/DeleteHandler.java | 6 +++ .../java/org/apache/doris/load/DeleteJob.java | 27 +++++++++--- .../java/org/apache/doris/qe/Coordinator.java | 4 +- .../org/apache/doris/qe/StmtExecutor.java | 2 +- .../apache/doris/rpc/BackendServiceProxy.java | 44 +++++++++++++++---- .../catalog/DynamicPartitionTableTest.java | 8 ++++ .../apache/doris/common/FeNameFormatTest.java | 3 ++ 26 files changed, 150 insertions(+), 49 deletions(-) diff --git a/be/src/common/utils.h b/be/src/common/utils.h index 50c6344475..cb6647acb2 100644 --- a/be/src/common/utils.h +++ b/be/src/common/utils.h @@ -53,4 +53,12 @@ void set_request_auth(T* req, const AuthInfo& auth) { } } +// This is the threshold used to periodically release the memory occupied by the expression. +// RELEASE_CONTEXT_COUNTER should be power of 2 +// GCC will optimize the modulo operation to &(release_context_counter - 1) +// _conjunct_ctxs will free local alloc after this probe calculations +static constexpr int RELEASE_CONTEXT_COUNTER = 1 << 7; +static_assert((RELEASE_CONTEXT_COUNTER & (RELEASE_CONTEXT_COUNTER - 1)) == 0, + "should be power of 2"); + } // namespace doris diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 8910c0848d..2b1c075df6 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -18,6 +18,7 @@ #include "base_scanner.h" #include "common/logging.h" +#include "common/utils.h" #include "exec/exec_node.h" #include "exprs/expr_context.h" #include "runtime/descriptors.h" diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 3d3702a2cf..957dffc350 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -94,8 +94,6 @@ protected: bool _strict_mode; int32_t _line_counter; - // reference to HASH_JOIN_NODE::RELEASE_CONTEXT_COUNTER - const static constexpr int32_t RELEASE_CONTEXT_COUNTER = 1 << 5; // Profile RuntimeProfile* _profile; RuntimeProfile::Counter* _rows_read_counter; diff --git a/be/src/exec/hash_join_node.h b/be/src/exec/hash_join_node.h index 4e1ea92dc7..2379f7771c 100644 --- a/be/src/exec/hash_join_node.h +++ b/be/src/exec/hash_join_node.h @@ -175,13 +175,6 @@ private: std::string get_probe_row_output_string(TupleRow* probe_row); std::vector _runtime_filter_descs; - - // RELEASE_CONTEXT_COUNTER should be power of 2 - // GCC will optimize the modulo operation to &(release_context_counter - 1) - // build_expr_context and probe_expr_context will free local alloc after this probe calculations - static constexpr int RELEASE_CONTEXT_COUNTER = 1 << 5; - static_assert((RELEASE_CONTEXT_COUNTER & (RELEASE_CONTEXT_COUNTER - 1)) == 0, - "should be power of 2"); }; } // namespace doris diff --git a/be/src/exec/hash_join_node_ir.cpp b/be/src/exec/hash_join_node_ir.cpp index 7f885e7bdb..b512f305ab 100644 --- a/be/src/exec/hash_join_node_ir.cpp +++ b/be/src/exec/hash_join_node_ir.cpp @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +#include "common/utils.h" #include "exec/hash_join_node.h" #include "exec/hash_table.hpp" #include "exprs/expr_context.h" diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index 5f977ed463..9e1a992303 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -20,6 +20,8 @@ #include #include "gen_cpp/PaloInternalService_types.h" +#include "common/utils.h" +#include "exprs/expr_context.h" #include "olap/decimal12.h" #include "olap/field.h" #include "olap/uint24.h" @@ -294,6 +296,10 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) { VLOG_ROW << "OlapScanner input row: " << Tuple::to_string(tuple, *_tuple_desc); } + if (_num_rows_read % RELEASE_CONTEXT_COUNTER == 0) { + ExprContext::free_local_allocations(_conjunct_ctxs); + } + // 3.4 Set tuple to RowBatch(not committed) int row_idx = batch->add_row(); TupleRow* row = batch->get_row(row_idx); diff --git a/be/src/exprs/string_functions.cpp b/be/src/exprs/string_functions.cpp index 2c72bebfb6..8df42cd86f 100644 --- a/be/src/exprs/string_functions.cpp +++ b/be/src/exprs/string_functions.cpp @@ -550,8 +550,9 @@ static re2::RE2* compile_regex(const StringVal& pattern, std::string* error_str, re2::RE2::Options options; // Disable error logging in case e.g. every row causes an error options.set_log_errors(false); + // ATTN(cmy): no set it, or the lazy mode of regex won't work. See Doris #6587 // Return the leftmost longest match (rather than the first match). - options.set_longest_match(true); + // options.set_longest_match(true); options.set_dot_nl(true); if (!match_parameter.is_null && !StringFunctions::set_re2_options(match_parameter, error_str, &options)) { diff --git a/be/src/runtime/file_result_writer.h b/be/src/runtime/file_result_writer.h index a38ff9fe63..98ccfda490 100644 --- a/be/src/runtime/file_result_writer.h +++ b/be/src/runtime/file_result_writer.h @@ -165,8 +165,9 @@ private: // bytes of written data RuntimeProfile::Counter* _written_data_bytes = nullptr; - BufferControlBlock* _sinker; - RowBatch* _output_batch; + // _sinker and _output_batch are not owned by FileResultWriter + BufferControlBlock* _sinker = nullptr; + RowBatch* _output_batch = nullptr; // set to true if the final statistic result is sent bool _is_result_sent = false; }; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 834529c7dd..281df630d7 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -746,8 +746,8 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, selected_columns->emplace_back(std::move(col)); } - LOG(INFO) << "BackendService execute open() TQueryPlanInfo: " - << apache::thrift::ThriftDebugString(t_query_plan_info); + VLOG_QUERY << "BackendService execute open() TQueryPlanInfo: " + << apache::thrift::ThriftDebugString(t_query_plan_info); // assign the param used to execute PlanFragment TExecPlanFragmentParams exec_fragment_params; exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0; diff --git a/be/src/runtime/result_file_sink.h b/be/src/runtime/result_file_sink.h index edeecf67e6..34d038cde0 100644 --- a/be/src/runtime/result_file_sink.h +++ b/be/src/runtime/result_file_sink.h @@ -74,7 +74,7 @@ private: boost::shared_ptr _sender; boost::shared_ptr _writer; - RowBatch* _output_batch; + RowBatch* _output_batch = nullptr; int _buf_size = 1024; // Allocated from _pool bool _is_top_sink = true; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index a8f4ae701b..27bfa69bc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -122,7 +122,7 @@ public class SchemaChangeHandler extends AlterHandler { public int cycle_count = 0; public SchemaChangeHandler() { - super("schema change", FeConstants.default_schema_change_scheduler_interval_millisecond); + super("schema change", Config.default_schema_change_scheduler_interval_millisecond); } private void processAddColumn(AddColumnClause alterClause, OlapTable olapTable, @@ -1640,9 +1640,9 @@ public class SchemaChangeHandler extends AlterHandler { Catalog.getCurrentCatalog().convertDistributionType(db, olapTable); return; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_SEND_CLEAR_ALTER_TASK)) { - /* - * This is only for fixing bug when upgrading Doris from 0.9.x to 0.10.x. - */ + /* + * This is only for fixing bug when upgrading Doris from 0.9.x to 0.10.x. + */ sendClearAlterTask(db, olapTable); return; } else if (DynamicPartitionUtil.checkDynamicPartitionPropertiesExist(properties)) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index bd8bf344da..c6218984e7 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -278,7 +278,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; - import javax.annotation.Nullable; public class Catalog { @@ -5496,6 +5495,7 @@ public class Catalog { public void modifyTableDynamicPartition(Database db, OlapTable table, Map properties) throws UserException { + convertDynamicPartitionReplicaNumToReplicaAllocation(properties); Map logProperties = new HashMap<>(properties); TableProperty tableProperty = table.getTableProperty(); if (tableProperty == null) { @@ -5517,6 +5517,14 @@ public class Catalog { editLog.logDynamicPartition(info); } + private void convertDynamicPartitionReplicaNumToReplicaAllocation(Map properties) { + if (properties.containsKey(DynamicPartitionProperty.REPLICATION_NUM)) { + Short repNum = Short.valueOf(properties.remove(DynamicPartitionProperty.REPLICATION_NUM)); + ReplicaAllocation replicaAlloc = new ReplicaAllocation(repNum); + properties.put(DynamicPartitionProperty.REPLICATION_ALLOCATION, replicaAlloc.toCreateStmt()); + } + } + /** * Set replication number for unpartitioned table. * @param db diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java index e4e9ffc0ae..18e66b15f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletSchedCtx.java @@ -47,13 +47,13 @@ import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TTabletInfo; import org.apache.doris.thrift.TTaskType; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.List; import java.util.Map; import java.util.Set; @@ -441,7 +441,13 @@ public class TabletSchedCtx implements Comparable { * database lock should be held. */ public boolean containsBE(long beId) { - String host = infoService.getBackend(beId).getHost(); + Backend backend = infoService.getBackend(beId); + if (backend == null) { + // containsBE() is currently only used for choosing dest backend to do clone task. + // return true so that it won't choose this backend. + return true; + } + String host = backend.getHost(); for (Replica replica : tablet.getReplicas()) { Backend be = infoService.getBackend(replica.getBackendId()); if (be == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java index 2a858684df..907c917f83 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/clone/TabletScheduler.java @@ -53,9 +53,6 @@ import org.apache.doris.task.CloneTask; import org.apache.doris.task.DropReplicaTask; import org.apache.doris.thrift.TFinishTaskRequest; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.base.Preconditions; import com.google.common.collect.EvictingQueue; import com.google.common.collect.HashBasedTable; @@ -65,6 +62,9 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.collect.Table; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.Collection; import java.util.List; import java.util.Map; @@ -413,9 +413,10 @@ public class TabletScheduler extends MasterDaemon { tabletCtx.getTabletId(), e); stat.counterTabletScheduledFailed.incrementAndGet(); finalizeTabletCtx(tabletCtx, TabletSchedCtx.State.UNEXPECTED, e.getMessage()); + continue; } - Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING); + Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING, tabletCtx.getState()); stat.counterTabletScheduledSucceeded.incrementAndGet(); addToRunningTablets(tabletCtx); } @@ -1159,7 +1160,6 @@ public class TabletScheduler extends MasterDaemon { continue; } - List resultPaths = Lists.newArrayList(); BalanceStatus st = bes.isFit(tabletCtx.getTabletSize(), tabletCtx.getStorageMedium(), resultPaths, tabletCtx.getTabletStatus() != TabletStatus.REPLICA_RELOCATING diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 43e2fabe21..2eaf139268 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1485,4 +1485,14 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int table_name_length_limit = 64; + + /* + * The job scheduling interval of the schema change handler. + * The user should not set this parameter. + * This parameter is currently only used in the regression test environment to appropriately + * reduce the running speed of the schema change job to test the correctness of the system + * in the case of multiple tasks in parallel. + */ + @ConfField(mutable = false, masterOnly = true) + public static int default_schema_change_scheduler_interval_millisecond = 500; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java index 28a0872e6c..49806fc05d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java @@ -47,9 +47,6 @@ public class FeConstants { // default scheduler interval is 10 seconds public static int default_scheduler_interval_millisecond = 10000; - // default schema change scheduler interval is 500 millisecond - public static int default_schema_change_scheduler_interval_millisecond = 500; - // general model // Current meta data version. Use this version to write journals and image public static int meta_version = FeMetaVersion.VERSION_CURRENT; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java index 002636e455..13b4a6fc48 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeNameFormat.java @@ -49,7 +49,7 @@ public class FeNameFormat { public static void checkTableName(String tableName) throws AnalysisException { if (Strings.isNullOrEmpty(tableName) || !tableName.matches(COMMON_TABLE_NAME_REGEX) - || tableName.length() >= Config.table_name_length_limit) { + || tableName.length() > Config.table_name_length_limit) { ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_TABLE_NAME, tableName); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java index 09dc859594..174673bb88 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java @@ -211,6 +211,12 @@ public class DynamicPartitionUtil { } } + private static void checkReplicaAllocation(ReplicaAllocation replicaAlloc) throws DdlException { + if (replicaAlloc.getTotalReplicaNum() <= 0) { + ErrorReport.reportDdlException(ErrorCode.ERROR_DYNAMIC_PARTITION_REPLICATION_NUM_ZERO); + } + } + private static void checkHotPartitionNum(String val) throws DdlException { if (Strings.isNullOrEmpty(val)) { throw new DdlException("Invalid properties: " + DynamicPartitionProperty.HOT_PARTITION_NUM); @@ -433,6 +439,7 @@ public class DynamicPartitionUtil { if (properties.containsKey(DynamicPartitionProperty.REPLICATION_ALLOCATION)) { ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "dynamic_partition"); + checkReplicaAllocation(replicaAlloc); properties.remove(DynamicPartitionProperty.REPLICATION_ALLOCATION); analyzedProperties.put(DynamicPartitionProperty.REPLICATION_ALLOCATION, replicaAlloc.toCreateStmt()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java index 0f740dd0c8..3357e607b0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java @@ -343,7 +343,7 @@ public class BDBEnvironment { if (StringUtils.isNumeric(name)) { ret.add(Long.parseLong(name)); } else { - LOG.debug("get database names, skipped {}", name); + // LOG.debug("get database names, skipped {}", name); } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java index 4fb8b8987d..8f525ad0d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteHandler.java @@ -211,6 +211,12 @@ public class DeleteHandler implements Writable { idToDeleteJob.put(deleteJob.getTransactionId(), deleteJob); Catalog.getCurrentGlobalTransactionMgr().getCallbackFactory().addCallback(deleteJob); + TransactionState txnState = Catalog.getCurrentGlobalTransactionMgr().getTransactionState(db.getId(), transactionId); + // must call this to make sure we only handle the tablet in the mIndex we saw here. + // table may be under schema changge or rollup, and the newly created tablets will not be checked later, + // to make sure that the delete transaction can be done successfully. + txnState.addTableIndexes(olapTable); + // task sent to be AgentBatchTask batchTask = new AgentBatchTask(); // count total replica num diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java index 61dcad5125..09747df7b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/DeleteJob.java @@ -19,6 +19,7 @@ package org.apache.doris.load; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Replica; +import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; import org.apache.doris.common.FeConstants; import org.apache.doris.common.MetaNotFoundException; @@ -27,12 +28,12 @@ import org.apache.doris.task.PushTask; import org.apache.doris.transaction.AbstractTxnStateChangeCallback; import org.apache.doris.transaction.TransactionState; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import java.util.Collection; import java.util.Map; import java.util.Set; @@ -100,8 +101,24 @@ public class DeleteJob extends AbstractTxnStateChangeCallback { quorumTablets.add(tDeleteInfo.getTabletId()); } } - LOG.info("check delete job quorum, transaction id: {}, total tablets: {}, quorum tablets: {},", - signature, totalTablets.size(), quorumTablets.size()); + + int dropCounter = 0; + TabletInvertedIndex invertedIndex = Catalog.getCurrentInvertedIndex(); + for (long tabletId : totalTablets) { + if (invertedIndex.getTabletMeta(tabletId) == null) { + // tablet does not exist. + // This may happen during the delete operation, and the schema change task ends, + // causing the old tablet to be deleted. + // We think this situation is normal. In order to ensure that the delete task can end normally + // here we regard these deleted tablets as completed. + finishedTablets.add(tabletId); + dropCounter++; + LOG.warn("tablet {} has been dropped when checking delete job {}", tabletId, id); + } + } + + LOG.info("check delete job quorum, transaction id: {}, total tablets: {}, quorum tablets: {}, dropped tablets: {}", + signature, totalTablets.size(), quorumTablets.size(), dropCounter); if (finishedTablets.containsAll(totalTablets)) { setState(DeleteState.FINISHED); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 507ea67881..2ded32b34a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -578,6 +578,7 @@ public class Coordinator { LOG.warn("catch a timeout exception", e); exception = e; code = TStatusCode.TIMEOUT; + BackendServiceProxy.getInstance().removeProxy(pair.first.brpcAddress); } if (code != TStatusCode.OK) { @@ -1823,6 +1824,7 @@ public class Coordinator { boolean hasCanceled; int profileFragmentId; RuntimeProfile profile; + TNetworkAddress brpcAddress; TNetworkAddress address; Backend backend; long lastMissingHeartbeatTime = -1; @@ -1901,7 +1903,7 @@ public class Coordinator { if (this.hasCanceled) { return false; } - TNetworkAddress brpcAddress = toBrpcHost(address); + brpcAddress = toBrpcHost(address); try { BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress, diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index ec3a4c7beb..9e5a6f85f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -544,7 +544,7 @@ public class StmtExecutor implements ProfileWriter { } else { try { parsedStmt.analyze(analyzer); - } catch (AnalysisException e) { + } catch (UserException e) { throw e; } catch (Exception e) { LOG.warn("Analyze failed because ", e); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index 2210645b1b..53b648f27d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -24,22 +24,24 @@ import org.apache.doris.thrift.TFoldConstantParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TUniqueId; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; import org.apache.thrift.TSerializer; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; - import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class BackendServiceProxy { private static final Logger LOG = LogManager.getLogger(BackendServiceProxy.class); private static volatile BackendServiceProxy INSTANCE; private final Map serviceMap; + private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); public BackendServiceProxy() { serviceMap = Maps.newHashMap(); @@ -56,14 +58,40 @@ public class BackendServiceProxy { return INSTANCE; } - private synchronized BackendServiceClient getProxy(TNetworkAddress address) { - BackendServiceClient service = serviceMap.get(address); + public void removeProxy(TNetworkAddress address) { + lock.writeLock().lock(); + try { + serviceMap.remove(address); + } finally { + lock.writeLock().unlock(); + } + } + + private BackendServiceClient getProxy(TNetworkAddress address) { + BackendServiceClient service; + lock.readLock().lock(); + try { + service = serviceMap.get(address); + } finally { + lock.readLock().unlock(); + } + if (service != null) { return service; } - service = new BackendServiceClient(address); - serviceMap.put(address, service); - return service; + + // not exist, create one and return. + lock.writeLock().lock(); + try { + service = serviceMap.get(address); + if (service == null) { + service = new BackendServiceClient(address); + serviceMap.put(address, service); + } + return service; + } finally { + lock.writeLock().unlock(); + } } public Future execPlanFragmentAsync( diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java index a67245664d..efc2685d70 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/DynamicPartitionTableTest.java @@ -459,6 +459,14 @@ public class DynamicPartitionTableTest { Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException("default_cluster:test"); OlapTable table = (OlapTable) db.getTableOrAnalysisException(tableName); Assert.assertEquals(2, table.getTableProperty().getDynamicPartitionProperty().getReplicaAllocation().getTotalReplicaNum()); + + String alter1 = "alter table test.dynamic_partition_replication_num set ('dynamic_partition.replication_num' = '1')"; + ExceptionChecker.expectThrowsNoException(() -> alterTable(alter1)); + Assert.assertEquals(1, table.getTableProperty().getDynamicPartitionProperty().getReplicaAllocation().getTotalReplicaNum()); + + String alter2 = "alter table test.dynamic_partition_replication_num set ('dynamic_partition.replication_num' = '0')"; + ExceptionChecker.expectThrows(DdlException.class, () -> alterTable(alter2)); + Assert.assertEquals(1, table.getTableProperty().getDynamicPartitionProperty().getReplicaAllocation().getTotalReplicaNum()); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/FeNameFormatTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/FeNameFormatTest.java index 1acb264ff7..32678db556 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/FeNameFormatTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/FeNameFormatTest.java @@ -32,6 +32,9 @@ public class FeNameFormatTest { ExceptionChecker.expectThrows(AnalysisException.class, () -> FeNameFormat.checkColumnName("#id_")); ExceptionChecker.expectThrows(AnalysisException.class, () -> FeNameFormat.checkColumnName("@@timestamp")); ExceptionChecker.expectThrows(AnalysisException.class, () -> FeNameFormat.checkColumnName("@timestamp@")); + // length 64 + String tblName = "test_sys_partition_list_basic_test_list_partition_bigint_tb_uniq"; + ExceptionChecker.expectThrowsNoException(() -> FeNameFormat.checkTableName(tblName)); } }