[bug](Fe) fix potential deadlock in show proc statement (#34988)
This commit is contained in:
@ -44,6 +44,7 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -52,6 +53,8 @@ import java.util.stream.Stream;
|
||||
*/
|
||||
public class DiagnoseClusterBalanceProcDir extends SubProcDir {
|
||||
|
||||
private ForkJoinPool taskPool = new ForkJoinPool();
|
||||
|
||||
@Override
|
||||
public List<DiagnoseItem> getDiagnoseResult() {
|
||||
long now = System.currentTimeMillis();
|
||||
@ -89,12 +92,14 @@ public class DiagnoseClusterBalanceProcDir extends SubProcDir {
|
||||
tabletHealth.status = DiagnoseStatus.OK;
|
||||
|
||||
Env env = Env.getCurrentEnv();
|
||||
List<DBTabletStatistic> statistics = env.getInternalCatalog().getDbIds().parallelStream()
|
||||
// skip information_schema database
|
||||
.flatMap(id -> Stream.of(id == 0 ? null : env.getInternalCatalog().getDbNullable(id)))
|
||||
.filter(Objects::nonNull).map(DBTabletStatistic::new)
|
||||
// sort by dbName
|
||||
.sorted(Comparator.comparing(db -> db.db.getFullName())).collect(Collectors.toList());
|
||||
List<DBTabletStatistic> statistics = taskPool.submit(() ->
|
||||
env.getInternalCatalog().getDbIds().parallelStream()
|
||||
// skip information_schema database
|
||||
.flatMap(id -> Stream.of(id == 0 ? null : env.getInternalCatalog().getDbNullable(id)))
|
||||
.filter(Objects::nonNull).map(DBTabletStatistic::new)
|
||||
// sort by dbName
|
||||
.sorted(Comparator.comparing(db -> db.db.getFullName())).collect(Collectors.toList())
|
||||
).join();
|
||||
|
||||
DBTabletStatistic total = statistics.stream().reduce(new DBTabletStatistic(), DBTabletStatistic::reduce);
|
||||
if (total.tabletNum != total.healthyNum) {
|
||||
|
||||
@ -35,6 +35,7 @@ import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -45,6 +46,8 @@ public class StatisticProcNode implements ProcNodeInterface {
|
||||
.build();
|
||||
private Env env;
|
||||
|
||||
private ForkJoinPool taskPool = new ForkJoinPool();
|
||||
|
||||
public StatisticProcNode(Env env) {
|
||||
Preconditions.checkNotNull(env);
|
||||
this.env = env;
|
||||
@ -52,12 +55,14 @@ public class StatisticProcNode implements ProcNodeInterface {
|
||||
|
||||
@Override
|
||||
public ProcResult fetchResult() throws AnalysisException {
|
||||
List<DBStatistic> statistics = env.getInternalCatalog().getDbIds().parallelStream()
|
||||
// skip information_schema database
|
||||
.flatMap(id -> Stream.of(id == 0 ? null : env.getCatalogMgr().getDbNullable(id)))
|
||||
.filter(Objects::nonNull).map(DBStatistic::new)
|
||||
// sort by dbName
|
||||
.sorted(Comparator.comparing(db -> db.db.getFullName())).collect(Collectors.toList());
|
||||
List<DBStatistic> statistics = taskPool.submit(() ->
|
||||
env.getInternalCatalog().getDbIds().parallelStream()
|
||||
// skip information_schema database
|
||||
.flatMap(id -> Stream.of(id == 0 ? null : env.getCatalogMgr().getDbNullable(id)))
|
||||
.filter(Objects::nonNull).map(DBStatistic::new)
|
||||
// sort by dbName
|
||||
.sorted(Comparator.comparing(db -> db.db.getFullName())).collect(Collectors.toList())
|
||||
).join();
|
||||
|
||||
List<List<String>> rows = new ArrayList<>(statistics.size() + 1);
|
||||
for (DBStatistic statistic : statistics) {
|
||||
|
||||
@ -47,6 +47,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -66,6 +67,8 @@ public class TabletHealthProcDir implements ProcDirInterface {
|
||||
|
||||
private Env env;
|
||||
|
||||
private ForkJoinPool taskPool = new ForkJoinPool();
|
||||
|
||||
public TabletHealthProcDir(Env env) {
|
||||
Preconditions.checkNotNull(env);
|
||||
this.env = env;
|
||||
@ -88,12 +91,14 @@ public class TabletHealthProcDir implements ProcDirInterface {
|
||||
|
||||
@Override
|
||||
public ProcResult fetchResult() throws AnalysisException {
|
||||
List<DBTabletStatistic> statistics = env.getInternalCatalog().getDbIds().parallelStream()
|
||||
// skip information_schema database
|
||||
.flatMap(id -> Stream.of(id == 0 ? null : env.getInternalCatalog().getDbNullable(id)))
|
||||
.filter(Objects::nonNull).map(DBTabletStatistic::new)
|
||||
// sort by dbName
|
||||
.sorted(Comparator.comparing(db -> db.db.getFullName())).collect(Collectors.toList());
|
||||
List<DBTabletStatistic> statistics = taskPool.submit(() ->
|
||||
env.getInternalCatalog().getDbIds().parallelStream()
|
||||
// skip information_schema database
|
||||
.flatMap(id -> Stream.of(id == 0 ? null : env.getInternalCatalog().getDbNullable(id)))
|
||||
.filter(Objects::nonNull).map(DBTabletStatistic::new)
|
||||
// sort by dbName
|
||||
.sorted(Comparator.comparing(db -> db.db.getFullName())).collect(Collectors.toList())
|
||||
).join();
|
||||
|
||||
List<List<String>> rows = new ArrayList<>(statistics.size() + 1);
|
||||
for (DBTabletStatistic statistic : statistics) {
|
||||
|
||||
Reference in New Issue
Block a user