diff --git a/be/src/olap/olap_table.cpp b/be/src/olap/olap_table.cpp index b47d678c05..2f6a48fe9f 100644 --- a/be/src/olap/olap_table.cpp +++ b/be/src/olap/olap_table.cpp @@ -760,30 +760,32 @@ void OLAPTable::load_pending_data() { break; } - if (_num_key_fields != pending_rowset.column_pruning_size()) { - LOG(WARNING) << "column pruning size is error when load pending data." - << "column_pruning_size=" << pending_rowset.column_pruning_size() << ", " - << "num_key_fields=" << _num_key_fields; - error_pending_data.insert(rowset->transaction_id()); - break; - } - std::vector> column_statistics_string(_num_key_fields); - std::vector null_vec(_num_key_fields); - for (size_t j = 0; j < _num_key_fields; ++j) { - ColumnPruning column_pruning = pending_rowset.column_pruning(j); - column_statistics_string[j].first = column_pruning.min(); - column_statistics_string[j].second = column_pruning.max(); - if (column_pruning.has_null_flag()) { - null_vec[j] = column_pruning.null_flag(); - } else { - null_vec[j] = false; + if (pending_rowset.column_pruning_size() != 0) { + if (_num_key_fields != pending_rowset.column_pruning_size()) { + LOG(WARNING) << "column pruning size is error when load pending data." + << "column_pruning_size=" << pending_rowset.column_pruning_size() << ", " + << "num_key_fields=" << _num_key_fields; + error_pending_data.insert(rowset->transaction_id()); + break; + } + std::vector> column_statistics_string(_num_key_fields); + std::vector null_vec(_num_key_fields); + for (size_t j = 0; j < _num_key_fields; ++j) { + ColumnPruning column_pruning = pending_rowset.column_pruning(j); + column_statistics_string[j].first = column_pruning.min(); + column_statistics_string[j].second = column_pruning.max(); + if (column_pruning.has_null_flag()) { + null_vec[j] = column_pruning.null_flag(); + } else { + null_vec[j] = false; + } } - } - if (rowset->add_column_statistics(column_statistics_string, null_vec) != OLAP_SUCCESS) { - LOG(WARNING) << "fail to set column statistics when load pending data"; - error_pending_data.insert(pending_delta.transaction_id()); - break; + if (rowset->add_column_statistics(column_statistics_string, null_vec) != OLAP_SUCCESS) { + LOG(WARNING) << "fail to set column statistics when load pending data"; + error_pending_data.insert(pending_delta.transaction_id()); + break; + } } if (rowset->load() != OLAP_SUCCESS) { @@ -1182,32 +1184,34 @@ Rowset* OLAPTable::_construct_index_from_version(const PDelta* delta, int32_t ro return nullptr; } - if (_num_key_fields != prowset->column_pruning_size()) { - LOG(WARNING) << "column pruning size error, " << "table=" << full_name() << ", " + if (prowset->column_pruning_size() != 0) { + if (_num_key_fields != prowset->column_pruning_size()) { + LOG(WARNING) << "column pruning size error, " << "table=" << full_name() << ", " << "version=" << version.first << "-" << version.second << ", " << "version_hash=" << delta->version_hash() << ", " << "column_pruning_size=" << prowset->column_pruning_size() << ", " << "num_key_fields=" << _num_key_fields; - SAFE_DELETE(rowset); - return nullptr; - } - vector> column_statistic_strings(_num_key_fields); - std::vector null_vec(_num_key_fields); - for (size_t j = 0; j < _num_key_fields; ++j) { - ColumnPruning column_pruning = prowset->column_pruning(j); - column_statistic_strings[j].first = column_pruning.min(); - column_statistic_strings[j].second = column_pruning.max(); - if (column_pruning.has_null_flag()) { - null_vec[j] = column_pruning.null_flag(); - } else { - null_vec[j] = false; + SAFE_DELETE(rowset); + return nullptr; + } + vector> column_statistic_strings(_num_key_fields); + std::vector null_vec(_num_key_fields); + for (size_t j = 0; j < _num_key_fields; ++j) { + ColumnPruning column_pruning = prowset->column_pruning(j); + column_statistic_strings[j].first = column_pruning.min(); + column_statistic_strings[j].second = column_pruning.max(); + if (column_pruning.has_null_flag()) { + null_vec[j] = column_pruning.null_flag(); + } else { + null_vec[j] = false; + } } - } - res = rowset->add_column_statistics(column_statistic_strings, null_vec); - if (res != OLAP_SUCCESS) { - SAFE_DELETE(rowset); - return nullptr; + res = rowset->add_column_statistics(column_statistic_strings, null_vec); + if (res != OLAP_SUCCESS) { + SAFE_DELETE(rowset); + return nullptr; + } } res = rowset->load(); diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index 07d9718f7e..4d445cc68d 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -17,6 +17,20 @@ package org.apache.doris.catalog; +import com.google.common.base.Joiner; +import com.google.common.base.Joiner.MapJoiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import com.google.common.collect.Range; +import com.google.common.collect.Sets; +import com.sleepycat.je.rep.InsufficientLogException; +import com.sleepycat.je.rep.NetworkRestore; +import com.sleepycat.je.rep.NetworkRestoreConfig; + import org.apache.doris.alter.Alter; import org.apache.doris.alter.AlterJob; import org.apache.doris.alter.AlterJob.JobType; @@ -96,6 +110,7 @@ import org.apache.doris.common.util.Daemon; import org.apache.doris.common.util.KuduUtil; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.common.util.QueryableReentrantLock; import org.apache.doris.common.util.Util; import org.apache.doris.consistency.ConsistencyChecker; import org.apache.doris.deploy.DeployManager; @@ -160,21 +175,6 @@ import org.apache.doris.thrift.TStorageType; import org.apache.doris.thrift.TTaskType; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.PublishVersionDaemon; - -import com.google.common.base.Joiner; -import com.google.common.base.Joiner.MapJoiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; -import com.google.common.collect.Range; -import com.google.common.collect.Sets; -import com.sleepycat.je.rep.InsufficientLogException; -import com.sleepycat.je.rep.NetworkRestore; -import com.sleepycat.je.rep.NetworkRestoreConfig; - import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.client.CreateTableOptions; @@ -208,7 +208,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; public class Catalog { private static final Logger LOG = LogManager.getLogger(Catalog.class); @@ -231,7 +230,8 @@ public class Catalog { // Operations like Get or Put do not need lock. // We use fair ReentrantLock to avoid starvation. Do not use this lock in critical code pass // because fair lock has poor performance. - private ReentrantLock lock; + // Using QueryableReentrantLock to print owner thread in debug mode. + private QueryableReentrantLock lock; private ConcurrentHashMap idToDb; private ConcurrentHashMap fullNameToDb; @@ -374,7 +374,7 @@ public class Catalog { this.clone = new Clone(); this.alter = new Alter(); this.consistencyChecker = new ConsistencyChecker(); - this.lock = new ReentrantLock(true); + this.lock = new QueryableReentrantLock(true); this.backupHandler = new BackupHandler(this); this.metaDir = Config.meta_dir; this.userPropertyMgr = new UserPropertyMgr(); @@ -498,6 +498,14 @@ public class Catalog { while (true) { try { if (!lock.tryLock(Config.catalog_try_lock_timeout_ms, TimeUnit.MILLISECONDS)) { + if (LOG.isDebugEnabled()) { + // to see which thread held this lock for long time. + Thread owner = lock.getOwner(); + if (owner != null) { + LOG.debug("catalog lock is held by: {}", Util.dumpThread(owner, 10)); + } + } + if (mustLock) { continue; } else { diff --git a/fe/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java b/fe/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java new file mode 100644 index 0000000000..1f0283434f --- /dev/null +++ b/fe/src/main/java/org/apache/doris/common/util/QueryableReentrantLock.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import java.util.concurrent.locks.ReentrantLock; + +/* + * This Lock is for exposing the getOwner() method, + * which is a protected method of ReentrantLock + */ +public class QueryableReentrantLock extends ReentrantLock { + private static final long serialVersionUID = 1L; + + public QueryableReentrantLock() { + super(); + } + + public QueryableReentrantLock(boolean fair) { + super(fair); + } + + @Override + public Thread getOwner() { + return super.getOwner(); + } +} diff --git a/fe/src/main/java/org/apache/doris/common/util/Util.java b/fe/src/main/java/org/apache/doris/common/util/Util.java index a96bc0a0b4..20807318fa 100644 --- a/fe/src/main/java/org/apache/doris/common/util/Util.java +++ b/fe/src/main/java/org/apache/doris/common/util/Util.java @@ -17,12 +17,10 @@ package org.apache.doris.common.util; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.PrimitiveType; -import org.apache.doris.common.Config; - import com.google.common.collect.Lists; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -308,5 +306,20 @@ public class Util { } return directory.delete(); } + + public static String dumpThread(Thread t, int lineNum) { + StringBuilder sb = new StringBuilder(); + StackTraceElement[] elements = t.getStackTrace(); + sb.append("dump thread: ").append(t.getName()).append(", id: ").append(t.getId()).append("\n"); + int count = lineNum; + for (StackTraceElement element : elements) { + if (count == 0) { + break; + } + sb.append(" ").append(element.toString()).append("\n"); + --count; + } + return sb.toString(); + } } diff --git a/fe/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java b/fe/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java new file mode 100644 index 0000000000..f8f7b2178f --- /dev/null +++ b/fe/src/test/java/org/apache/doris/common/util/QueryableReentrantLockTest.java @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.util; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +public class QueryableReentrantLockTest { + + private QueryableReentrantLock lock = new QueryableReentrantLock(true); + + @Test + public void test() throws InterruptedException { + + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + lock.lock(); + try { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } finally { + lock.unlock(); + } + } + }, "thread1"); + + Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + try { + if (!lock.tryLock(1000, TimeUnit.MILLISECONDS)) { + Thread owner = lock.getOwner(); + Assert.assertEquals("thread1", owner.getName()); + + System.out.println(Util.dumpThread(owner, 10)); + + } + } catch (InterruptedException e) { + e.printStackTrace(); + } + + } + }, "thread2"); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + } + +}