[feat](lock)add deadlock detection tool and monitored lock implementations #39015 (#39099)

## Proposed changes
#39015
### Description:

This issue proposes the addition of new features to the project,
including a deadlock detection tool and monitored lock implementations.
These features will help in identifying and debugging potential
deadlocks and monitoring lock usage. Features:


#### AbstractMonitoredLock:

A monitored version of Lock that tracks and logs lock acquisition and
release times. Functionality:
Overrides lock(), unlock(), tryLock(), and tryLock(long timeout,
TimeUnit unit) methods. Logs information about lock acquisition time,
release time, and any failure to acquire the lock within the specified
timeout. ##### eg
```log
2024-08-07 12:02:59  [ Thread-2:2006 ] - [ WARN ]  Thread ID: 12, Thread Name: Thread-2 - Lock held for 1912 ms, exceeding hold timeout of 1000 ms 
Thread stack trace:
	at java.lang.Thread.getStackTrace(Thread.java:1564)
	at org.example.lock.AbstractMonitoredLock.afterUnlock(AbstractMonitoredLock.java:49)
	at org.example.lock.MonitoredReentrantLock.unlock(MonitoredReentrantLock.java:32)
	at org.example.ExampleService.timeout(ExampleService.java:17)
	at org.example.Main.lambda$test2$1(Main.java:39)
	at java.lang.Thread.run(Thread.java:750)
```












#### DeadlockCheckerTool:

Uses ScheduledExecutorService for periodic deadlock checks. Logs
deadlock information including thread names, states, lock info, and
stack traces.

**ThreadMXBean accesses thread information in the local JVM, which is
already in memory, so accessing it is less expensive than fetching data
from external resources such as disk or network. Thread state cache: The
JVM typically maintains a cache of thread states, reducing the need for
real-time calculations or additional data processing.** ##### eg
```log
Thread Name: Thread-0
Thread State: WAITING
Lock Name: java.util.concurrent.locks.ReentrantLock$NonfairSync@1d653213
Lock Owner Name: Thread-1
Lock Owner Id: 12
Waited Time: -1
Blocked Time: -1
Lock Info: java.util.concurrent.locks.ReentrantLock$NonfairSync@1d653213
Blocked by: java.util.concurrent.locks.ReentrantLock$NonfairSync@1d653213
Stack Trace: 
	at sun.misc.Unsafe.park(Native Method)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
	at org.example.lock.MonitoredReentrantLock.lock(MonitoredReentrantLock.java:22)
	at org.example.Main.lambda$testDeadLock$3(Main.java:79)
	at org.example.Main$$Lambda$1/1221555852.run(Unknown Source)
	at java.lang.Thread.run(Thread.java:750)


2024-08-07 14:11:28  [ pool-1-thread-1:2001 ] - [ WARN ]  Deadlocks detected:
Thread Name: Thread-1
Thread State: WAITING
Lock Name: java.util.concurrent.locks.ReentrantLock$NonfairSync@13a2dfcf
Lock Owner Name: Thread-0
Lock Owner Id: 11
Waited Time: -1
Blocked Time: -1
Lock Info: java.util.concurrent.locks.ReentrantLock$NonfairSync@13a2dfcf
Blocked by: java.util.concurrent.locks.ReentrantLock$NonfairSync@13a2dfcf
Stack Trace: 
	at sun.misc.Unsafe.park(Native Method)
	at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
	at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
	at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
	at org.example.lock.MonitoredReentrantLock.lock(MonitoredReentrantLock.java:22)
	at org.example.Main.lambda$testDeadLock$4(Main.java:93)
	at org.example.Main$$Lambda$2/1556956098.run(Unknown Source)
	at java.lang.Thread.run(Thread.java:750)


```
##### benchmark
```
    @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
    @Measurement(iterations = 2, time = 2, timeUnit = TimeUnit.SECONDS)
    @Threads(1)

Benchmark                                                          Mode  Cnt       Score   Error   Units
LockBenchmark.testMonitoredLock                                   thrpt    2   15889.407          ops/ms
LockBenchmark.testMonitoredLock:·gc.alloc.rate                    thrpt    2     678.061          MB/sec
LockBenchmark.testMonitoredLock:·gc.alloc.rate.norm               thrpt    2      56.000            B/op
LockBenchmark.testMonitoredLock:·gc.churn.PS_Eden_Space           thrpt    2     668.249          MB/sec
LockBenchmark.testMonitoredLock:·gc.churn.PS_Eden_Space.norm      thrpt    2      55.080            B/op
LockBenchmark.testMonitoredLock:·gc.churn.PS_Survivor_Space       thrpt    2       0.075          MB/sec
LockBenchmark.testMonitoredLock:·gc.churn.PS_Survivor_Space.norm  thrpt    2       0.006            B/op
LockBenchmark.testMonitoredLock:·gc.count                         thrpt    2      20.000          counts
LockBenchmark.testMonitoredLock:·gc.time                          thrpt    2       6.000              ms
LockBenchmark.testNativeLock                                      thrpt    2  103130.635          ops/ms
LockBenchmark.testNativeLock:·gc.alloc.rate                       thrpt    2      ≈ 10⁻⁴          MB/sec
LockBenchmark.testNativeLock:·gc.alloc.rate.norm                  thrpt    2      ≈ 10⁻⁶            B/op
LockBenchmark.testNativeLock:·gc.count                            thrpt    2         ≈ 0          counts

    @Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
    @Measurement(iterations = 2, time = 2, timeUnit = TimeUnit.SECONDS)
    @Threads(100)

Benchmark                                                          Mode  Cnt       Score   Error   Units
LockBenchmark.testMonitoredLock                                   thrpt    2   10994.606          ops/ms
LockBenchmark.testMonitoredLock:·gc.alloc.rate                    thrpt    2     488.508          MB/sec
LockBenchmark.testMonitoredLock:·gc.alloc.rate.norm               thrpt    2      56.002            B/op
LockBenchmark.testMonitoredLock:·gc.churn.PS_Eden_Space           thrpt    2     481.390          MB/sec
LockBenchmark.testMonitoredLock:·gc.churn.PS_Eden_Space.norm      thrpt    2      55.163            B/op
LockBenchmark.testMonitoredLock:·gc.churn.PS_Survivor_Space       thrpt    2       0.020          MB/sec
LockBenchmark.testMonitoredLock:·gc.churn.PS_Survivor_Space.norm  thrpt    2       0.002            B/op
LockBenchmark.testMonitoredLock:·gc.count                         thrpt    2      18.000          counts
LockBenchmark.testMonitoredLock:·gc.time                          thrpt    2       9.000              ms
LockBenchmark.testNativeLock                                      thrpt    2  558652.036          ops/ms
LockBenchmark.testNativeLock:·gc.alloc.rate                       thrpt    2       0.016          MB/sec
LockBenchmark.testNativeLock:·gc.alloc.rate.norm                  thrpt    2      ≈ 10⁻⁴            B/op
LockBenchmark.testNativeLock:·gc.count                            thrpt    2         ≈ 0          counts
```

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->
This commit is contained in:
Calvin Kirs
2024-08-08 21:15:49 +08:00
committed by GitHub
parent f8f5be7ce7
commit 30e2c3fb11
22 changed files with 482 additions and 116 deletions

View File

@ -27,6 +27,7 @@ import org.apache.doris.common.Log4jConfig;
import org.apache.doris.common.LogUtils;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.Version;
import org.apache.doris.common.lock.DeadlockMonitor;
import org.apache.doris.common.util.JdkUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.httpv2.HttpServer;
@ -60,6 +61,7 @@ import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.TimeUnit;
public class DorisFE {
private static final Logger LOG = LogManager.getLogger(DorisFE.class);
@ -95,6 +97,13 @@ public class DorisFE {
start(DORIS_HOME_DIR, PID_DIR, args, options);
}
private static void startMonitor() {
if (Config.enable_deadlock_detection) {
DeadlockMonitor deadlockMonitor = new DeadlockMonitor();
deadlockMonitor.startMonitoring(Config.deadlock_detection_interval_minute, TimeUnit.MINUTES);
}
}
// entrance for doris frontend
public static void start(String dorisHomeDir, String pidDir, String[] args, StartupOptions options) {
if (System.getenv("DORIS_LOG_TO_STDERR") != null) {
@ -216,7 +225,7 @@ public class DorisFE {
}
ThreadPoolManager.registerAllThreadPoolMetric();
startMonitor();
while (true) {
Thread.sleep(2000);
}

View File

@ -25,6 +25,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.persist.ColocatePersistInfo;
@ -56,7 +57,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
@ -182,7 +182,7 @@ public class ColocateTableIndex implements Writable {
// save some error msg of the group for show. no need to persist
private Map<GroupId, String> group2ErrMsgs = Maps.newHashMap();
private transient ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private transient MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock();
public ColocateTableIndex() {

View File

@ -28,6 +28,7 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.datasource.CatalogIf;
@ -58,7 +59,6 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
@ -83,7 +83,8 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
private long id;
@SerializedName(value = "fullQualifiedName")
private volatile String fullQualifiedName;
private final ReentrantReadWriteLock rwLock;
private MonitoredReentrantReadWriteLock rwLock;
// table family group map
private final Map<Long, Table> idToTable;
@ -133,7 +134,7 @@ public class Database extends MetaObject implements Writable, DatabaseIf<Table>
if (this.fullQualifiedName == null) {
this.fullQualifiedName = "";
}
this.rwLock = new ReentrantReadWriteLock(true);
this.rwLock = new MonitoredReentrantReadWriteLock(true);
this.idToTable = Maps.newConcurrentMap();
this.nameToTable = Maps.newConcurrentMap();
this.lowerCaseToTableName = Maps.newConcurrentMap();

View File

@ -111,6 +111,7 @@ import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.lock.MonitoredReentrantLock;
import org.apache.doris.common.publish.TopicPublisher;
import org.apache.doris.common.publish.TopicPublisherThread;
import org.apache.doris.common.publish.WorkloadGroupPublisher;
@ -122,7 +123,6 @@ import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.QueryableReentrantLock;
import org.apache.doris.common.util.SmallFileMgr;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
@ -355,7 +355,7 @@ public class Env {
// We use fair ReentrantLock to avoid starvation. Do not use this lock in critical code pass
// because fair lock has poor performance.
// Using QueryableReentrantLock to print owner thread in debug mode.
private QueryableReentrantLock lock;
private MonitoredReentrantLock lock;
private CatalogMgr catalogMgr;
private GlobalFunctionMgr globalFunctionMgr;
@ -664,7 +664,7 @@ public class Env {
this.syncJobManager = new SyncJobManager();
this.alter = new Alter();
this.consistencyChecker = new ConsistencyChecker();
this.lock = new QueryableReentrantLock(true);
this.lock = new MonitoredReentrantLock(true);
this.backupHandler = new BackupHandler(this);
this.metaDir = Config.meta_dir;
this.publishVersionDaemon = new PublishVersionDaemon();

View File

@ -27,7 +27,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.QueryableReentrantReadWriteLock;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.persist.gson.GsonUtils;
@ -82,7 +82,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
protected TableType type;
@SerializedName(value = "createTime")
protected long createTime;
protected QueryableReentrantReadWriteLock rwLock;
protected MonitoredReentrantReadWriteLock rwLock;
/*
* fullSchema and nameToColumn should contains all columns, both visible and shadow.
@ -128,7 +128,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
this.type = type;
this.fullSchema = Lists.newArrayList();
this.nameToColumn = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
this.rwLock = new QueryableReentrantReadWriteLock(true);
this.rwLock = new MonitoredReentrantReadWriteLock(true);
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();
}
@ -151,7 +151,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
// Only view in with-clause have null base
Preconditions.checkArgument(type == TableType.VIEW, "Table has no columns");
}
this.rwLock = new QueryableReentrantReadWriteLock(true);
this.rwLock = new MonitoredReentrantReadWriteLock(true);
this.createTime = Instant.now().getEpochSecond();
if (Config.check_table_lock_leaky) {
this.readLockThreads = Maps.newConcurrentMap();

View File

@ -26,6 +26,7 @@ import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
@ -51,7 +52,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@ -128,7 +128,7 @@ public class Tablet extends MetaObject implements Writable {
private long cooldownReplicaId = -1;
@SerializedName(value = "cooldownTerm")
private long cooldownTerm = -1;
private ReentrantReadWriteLock cooldownConfLock = new ReentrantReadWriteLock();
private MonitoredReentrantReadWriteLock cooldownConfLock = new MonitoredReentrantReadWriteLock();
// last time that the tablet checker checks this tablet.
// no need to persist

View File

@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.lock;
import org.apache.doris.common.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Abstract base class for a monitored lock that tracks lock acquisition,
* release, and attempt times. It provides mechanisms for monitoring the
* duration for which a lock is held and logging any instances where locks
* are held longer than a specified timeout or fail to be acquired within
* a specified timeout.
*/
public abstract class AbstractMonitoredLock {
private static final Logger LOG = LoggerFactory.getLogger(AbstractMonitoredLock.class);
// Thread-local variable to store the lock start time
private final ThreadLocal<Long> lockStartTime = new ThreadLocal<>();
/**
* Method to be called after successfully acquiring the lock.
* Sets the start time for the lock.
*/
protected void afterLock() {
lockStartTime.set(System.nanoTime());
}
/**
* Method to be called after releasing the lock.
* Calculates the lock hold time and logs a warning if it exceeds the hold timeout.
*/
protected void afterUnlock() {
Long startTime = lockStartTime.get();
if (startTime != null) {
long lockHoldTimeNanos = System.nanoTime() - startTime;
long lockHoldTimeMs = lockHoldTimeNanos >> 20;
if (lockHoldTimeMs > Config.max_lock_hold_threshold_seconds * 1000) {
Thread currentThread = Thread.currentThread();
String stackTrace = getThreadStackTrace(currentThread.getStackTrace());
LOG.warn("Thread ID: {}, Thread Name: {} - Lock held for {} ms, exceeding hold timeout of {} ms "
+ "Thread stack trace:{}",
currentThread.getId(), currentThread.getName(), lockHoldTimeMs, lockHoldTimeMs, stackTrace);
}
lockStartTime.remove();
}
}
/**
* Method to be called after attempting to acquire the lock using tryLock.
* Logs a warning if the lock was not acquired within a reasonable time.
*
* @param acquired Whether the lock was successfully acquired
* @param startTime The start time of the lock attempt
*/
protected void afterTryLock(boolean acquired, long startTime) {
if (acquired) {
afterLock();
return;
}
if (LOG.isDebugEnabled()) {
long elapsedTime = (System.nanoTime() - startTime) >> 20;
Thread currentThread = Thread.currentThread();
String stackTrace = getThreadStackTrace(currentThread.getStackTrace());
LOG.debug("Thread ID: {}, Thread Name: {} - Failed to acquire the lock within {} ms"
+ "\nThread blocking info:\n{}",
currentThread.getId(), currentThread.getName(), elapsedTime, stackTrace);
}
}
/**
* Utility method to format the stack trace of a thread.
*
* @param stackTrace The stack trace elements of the thread
* @return A formatted string of the stack trace
*/
private String getThreadStackTrace(StackTraceElement[] stackTrace) {
StringBuilder sb = new StringBuilder();
for (StackTraceElement element : stackTrace) {
sb.append("\tat ").append(element).append("\n");
}
return sb.toString().replace("\n", "\\n");
}
}

View File

@ -0,0 +1,81 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.lock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* A utility class for monitoring and reporting deadlocks in a Java application.
* <p>
* This class uses the Java Management API to periodically check for deadlocked threads
* and logs detailed information about any detected deadlocks. It can be configured to
* run at a fixed interval.
* </p>
*/
public class DeadlockMonitor {
private static final Logger LOG = LoggerFactory.getLogger(DeadlockMonitor.class);
private final ThreadMXBean threadMXBean;
private final ScheduledExecutorService scheduler;
public DeadlockMonitor() {
this.threadMXBean = ManagementFactory.getThreadMXBean();
this.scheduler = Executors.newScheduledThreadPool(1);
}
/**
* Starts monitoring for deadlocks at a fixed rate.
*
* @param period the period between successive executions
* @param unit the time unit of the period parameter
*/
public void startMonitoring(long period, TimeUnit unit) {
scheduler.scheduleAtFixedRate(this::detectAndReportDeadlocks, 5, period, unit);
}
/**
* Detects and reports deadlocks if any are found.
*/
public void detectAndReportDeadlocks() {
// Get IDs of threads that are deadlocked
long[] deadlockedThreadIds = threadMXBean.findDeadlockedThreads();
// Check if there are no deadlocked threads
if (deadlockedThreadIds == null || deadlockedThreadIds.length == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("No deadlocks detected.");
}
return;
}
// Get information about deadlocked threads
ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(deadlockedThreadIds, true, true);
String deadlockReportString = Arrays.toString(threadInfos).replace("\n", "\\n");
// Log the deadlock report
LOG.warn("Deadlocks detected {}", deadlockReportString);
}
}

View File

@ -0,0 +1,98 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.lock;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
/**
* A monitored version of ReentrantLock that provides additional monitoring capabilities
* for lock acquisition and release.
*/
public class MonitoredReentrantLock extends ReentrantLock {
private static final long serialVersionUID = 1L;
// Monitor for tracking lock acquisition and release
private final AbstractMonitoredLock lockMonitor = new AbstractMonitoredLock() {
};
// Constructor for creating a monitored lock with fairness option
public MonitoredReentrantLock(boolean fair) {
super(fair);
}
// Constructor for creating a monitored lock with fairness option
public MonitoredReentrantLock() {
}
/**
* Acquires the lock.
* Records the time when the lock is acquired.
*/
@Override
public void lock() {
super.lock();
lockMonitor.afterLock();
}
/**
* Releases the lock.
* Records the time when the lock is released and logs the duration.
*/
@Override
public void unlock() {
lockMonitor.afterUnlock();
super.unlock();
}
/**
* Tries to acquire the lock.
* Records the time when the lock attempt started and logs the result.
*
* @return true if the lock was acquired, false otherwise
*/
@Override
public boolean tryLock() {
long start = System.nanoTime(); // Record start time
boolean acquired = super.tryLock(); // Attempt to acquire the lock
lockMonitor.afterTryLock(acquired, start); // Log result and elapsed time
return acquired;
}
/**
* Tries to acquire the lock within the specified time limit.
* Records the time when the lock attempt started and logs the result.
*
* @param timeout the time to wait for the lock
* @param unit the time unit of the timeout argument
* @return true if the lock was acquired, false if the waiting time elapsed before the lock was acquired
* @throws InterruptedException if the current thread is interrupted while waiting
*/
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
long start = System.nanoTime(); // Record start time
boolean acquired = super.tryLock(timeout, unit); // Attempt to acquire the lock
lockMonitor.afterTryLock(acquired, start); // Log result and elapsed time
return acquired;
}
@Override
public Thread getOwner() {
return super.getOwner();
}
}

View File

@ -0,0 +1,137 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* A monitored version of ReentrantReadWriteLock that provides additional
* monitoring capabilities for read and write locks.
*/
public class MonitoredReentrantReadWriteLock extends ReentrantReadWriteLock {
// Monitored read and write lock instances
private final ReadLock readLock = new ReadLock(this);
private final WriteLock writeLock = new WriteLock(this);
// Constructor for creating a monitored lock with fairness option
public MonitoredReentrantReadWriteLock(boolean fair) {
super(fair);
}
public MonitoredReentrantReadWriteLock() {
}
/**
* Monitored read lock class that extends ReentrantReadWriteLock.ReadLock.
*/
public class ReadLock extends ReentrantReadWriteLock.ReadLock {
private static final long serialVersionUID = 1L;
private final AbstractMonitoredLock monitor = new AbstractMonitoredLock() {};
/**
* Constructs a new ReadLock instance.
*
* @param lock The ReentrantReadWriteLock this lock is associated with
*/
protected ReadLock(ReentrantReadWriteLock lock) {
super(lock);
}
/**
* Acquires the read lock.
* Records the time when the lock is acquired.
*/
@Override
public void lock() {
super.lock();
monitor.afterLock();
}
/**
* Releases the read lock.
* Records the time when the lock is released and logs the duration.
*/
@Override
public void unlock() {
monitor.afterUnlock();
super.unlock();
}
}
/**
* Monitored write lock class that extends ReentrantReadWriteLock.WriteLock.
*/
public class WriteLock extends ReentrantReadWriteLock.WriteLock {
private static final long serialVersionUID = 1L;
private final AbstractMonitoredLock monitor = new AbstractMonitoredLock() {};
/**
* Constructs a new WriteLock instance.
*
* @param lock The ReentrantReadWriteLock this lock is associated with
*/
protected WriteLock(ReentrantReadWriteLock lock) {
super(lock);
}
/**
* Acquires the write lock.
* Records the time when the lock is acquired.
*/
@Override
public void lock() {
super.lock();
monitor.afterLock();
}
/**
* Releases the write lock.
* Records the time when the lock is released and logs the duration.
*/
@Override
public void unlock() {
monitor.afterUnlock();
super.unlock();
}
}
/**
* Returns the read lock associated with this lock.
*
* @return The monitored read lock
*/
@Override
public ReadLock readLock() {
return readLock;
}
/**
* Returns the write lock associated with this lock.
*
* @return The monitored write lock
*/
@Override
public WriteLock writeLock() {
return writeLock;
}
@Override
public Thread getOwner() {
return super.getOwner();
}
}

View File

@ -1,41 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import java.util.concurrent.locks.ReentrantLock;
/*
* This Lock is for exposing the getOwner() method,
* which is a protected method of ReentrantLock
*/
public class QueryableReentrantLock extends ReentrantLock {
private static final long serialVersionUID = 1L;
public QueryableReentrantLock() {
super();
}
public QueryableReentrantLock(boolean fair) {
super(fair);
}
@Override
public Thread getOwner() {
return super.getOwner();
}
}

View File

@ -1,41 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.util;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/*
* This Lock is for exposing the getOwner() method,
* which is a protected method of ReentrantLock
*/
public class QueryableReentrantReadWriteLock extends ReentrantReadWriteLock {
private static final long serialVersionUID = 1L;
public QueryableReentrantReadWriteLock() {
super();
}
public QueryableReentrantReadWriteLock(boolean fair) {
super(fair);
}
@Override
public Thread getOwner() {
return super.getOwner();
}
}

View File

@ -41,6 +41,7 @@ import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
@ -70,7 +71,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -90,7 +90,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
private static final String YES = "yes";
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private final MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock(true);
@SerializedName(value = "idToCatalog")
private final Map<Long, CatalogIf<? extends DatabaseIf<? extends TableIf>>> idToCatalog = Maps.newConcurrentMap();

View File

@ -28,6 +28,7 @@ import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
import org.apache.doris.datasource.infoschema.ExternalInfoSchemaTable;
@ -58,7 +59,6 @@ import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Base class of external database.
@ -69,7 +69,7 @@ public abstract class ExternalDatabase<T extends ExternalTable>
implements DatabaseIf<T>, Writable, GsonPostProcessable {
private static final Logger LOG = LogManager.getLogger(ExternalDatabase.class);
protected ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);
protected MonitoredReentrantReadWriteLock rwLock = new MonitoredReentrantReadWriteLock(true);
@SerializedName(value = "id")
protected long id;
@ -446,7 +446,7 @@ public abstract class ExternalDatabase<T extends ExternalTable>
}
}
idToTbl = tmpIdToTbl;
rwLock = new ReentrantReadWriteLock(true);
rwLock = new MonitoredReentrantReadWriteLock(true);
}
@Override

View File

@ -124,13 +124,13 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.lock.MonitoredReentrantLock;
import org.apache.doris.common.util.DbUtil;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.IdGeneratorUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.QueryableReentrantLock;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
@ -209,7 +209,7 @@ public class InternalCatalog implements CatalogIf<Database> {
private static final Logger LOG = LogManager.getLogger(InternalCatalog.class);
private QueryableReentrantLock lock = new QueryableReentrantLock(true);
private MonitoredReentrantLock lock = new MonitoredReentrantLock(true);
private ConcurrentHashMap<Long, Database> idToDb = new ConcurrentHashMap<>();
private ConcurrentHashMap<String, Database> fullNameToDb = new ConcurrentHashMap<>();

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.planner.ColumnBound;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PartitionPrunerV2Base.UniqueId;
@ -43,15 +44,13 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
@Data
public class TablePartitionValues {
public static final String HIVE_DEFAULT_PARTITION = "__HIVE_DEFAULT_PARTITION__";
private final ReadWriteLock readWriteLock;
private final MonitoredReentrantReadWriteLock readWriteLock;
private long lastUpdateTimestamp;
private long nextPartitionId;
private final Map<Long, PartitionItem> idToPartitionItem;
@ -68,7 +67,7 @@ public class TablePartitionValues {
private Map<UniqueId, Range<ColumnBound>> singleUidToColumnRangeMap;
public TablePartitionValues() {
readWriteLock = new ReentrantReadWriteLock();
readWriteLock = new MonitoredReentrantReadWriteLock();
lastUpdateTimestamp = 0;
nextPartitionId = 0;
idToPartitionItem = new HashMap<>();

View File

@ -23,6 +23,7 @@ import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.InsertJob;
@ -33,7 +34,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
@ -41,7 +41,7 @@ import java.util.stream.Collectors;
*/
public class LabelProcessor {
private final Map<Long, Map<String, List<InsertJob>>> dbIdToLabelToLoadJobs = new ConcurrentHashMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private final MonitoredReentrantReadWriteLock lock = new MonitoredReentrantReadWriteLock(true);
private void readLock() {
lock.readLock().lock();

View File

@ -18,6 +18,7 @@
package org.apache.doris.qe.cache;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.lock.MonitoredReentrantLock;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.SimpleScheduler;
import org.apache.doris.system.Backend;
@ -33,8 +34,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Use consistent hashing to find the BE corresponding to the key to
@ -47,7 +46,7 @@ public class CacheCoordinator {
public boolean debugModel = false;
private Hashtable<Long, Backend> realNodes = new Hashtable<>();
private SortedMap<Long, Backend> virtualNodes = new TreeMap<>();
private static Lock belock = new ReentrantLock();
private static MonitoredReentrantLock belock = new MonitoredReentrantLock();
private long lastRefreshTime;
private static CacheCoordinator cachePartition;

View File

@ -44,6 +44,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.lock.MonitoredReentrantReadWriteLock;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.InternalDatabaseUtil;
@ -90,7 +91,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
@ -119,7 +119,7 @@ public class DatabaseTransactionMgr {
// the lock is used to control the access to transaction states
// no other locks should be inside this lock
private final ReentrantReadWriteLock transactionLock = new ReentrantReadWriteLock(true);
private final MonitoredReentrantReadWriteLock transactionLock = new MonitoredReentrantReadWriteLock(true);
// transactionId -> running TransactionState
private final Map<Long, TransactionState> idToRunningTransactionState = Maps.newHashMap();