[opt](hudi) reduce the memory usage of avro reader (#23745)
1. Reduce the number of threads reading avro logs and keep the readers in a fixed thread pool. 2. Regularly cleaning the cached resolvers in the thread local map by reflection.
This commit is contained in:
@ -22,6 +22,9 @@ import org.apache.doris.common.jni.JniScanner;
|
||||
import org.apache.doris.common.jni.vec.ColumnType;
|
||||
import org.apache.doris.common.jni.vec.ScanPredicate;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.avro.generic.GenericDatumReader;
|
||||
import org.apache.avro.util.WeakIdentityHashMap;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.sql.catalyst.InternalRow;
|
||||
@ -30,14 +33,21 @@ import scala.collection.Iterator;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -56,6 +66,56 @@ public class HudiJniScanner extends JniScanner {
|
||||
private long getRecordReaderTimeNs = 0;
|
||||
private Iterator<InternalRow> recordIterator;
|
||||
|
||||
/**
|
||||
* `GenericDatumReader` of avro is a thread local map, that stores `WeakIdentityHashMap`.
|
||||
* `WeakIdentityHashMap` has cached the avro resolving decoder, and the cached resolver can only be cleaned when
|
||||
* its avro schema is recycled and become a week reference. However, the behavior of the week reference queue
|
||||
* of `WeakIdentityHashMap` is unpredictable. Secondly, the decoder is very memory intensive, the number of threads
|
||||
* to call the thread local map cannot be too many.
|
||||
* Two solutions:
|
||||
* 1. Reduce the number of threads reading avro logs and keep the readers in a fixed thread pool.
|
||||
* 2. Regularly cleaning the cached resolvers in the thread local map by reflection.
|
||||
*/
|
||||
private static final AtomicLong lastUpdateTime = new AtomicLong(System.currentTimeMillis());
|
||||
private static final long RESOLVER_TIME_OUT = 60000;
|
||||
private static final ExecutorService avroReadPool;
|
||||
private static ThreadLocal<WeakIdentityHashMap<?, ?>> AVRO_RESOLVER_CACHE;
|
||||
private static final Map<Long, WeakIdentityHashMap<?, ?>> cachedResolvers = new ConcurrentHashMap<>();
|
||||
private static final ReadWriteLock cleanResolverLock = new ReentrantReadWriteLock();
|
||||
private static final ScheduledExecutorService cleanResolverService = Executors.newScheduledThreadPool(1);
|
||||
|
||||
static {
|
||||
int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 2 + 1, 4);
|
||||
avroReadPool = Executors.newFixedThreadPool(numThreads,
|
||||
new ThreadFactoryBuilder().setNameFormat("avro-log-reader-%d").build());
|
||||
LOG.info("Create " + numThreads + " daemon threads to load avro logs");
|
||||
|
||||
Class<?> avroReader = GenericDatumReader.class;
|
||||
try {
|
||||
Field field = avroReader.getDeclaredField("RESOLVER_CACHE");
|
||||
field.setAccessible(true);
|
||||
AVRO_RESOLVER_CACHE = (ThreadLocal<WeakIdentityHashMap<?, ?>>) field.get(null);
|
||||
LOG.info("Get the resolved cache for avro reader");
|
||||
} catch (Exception e) {
|
||||
AVRO_RESOLVER_CACHE = null;
|
||||
LOG.warn("Failed to get the resolved cache for avro reader");
|
||||
}
|
||||
|
||||
cleanResolverService.scheduleAtFixedRate(() -> {
|
||||
cleanResolverLock.writeLock().lock();
|
||||
try {
|
||||
if (System.currentTimeMillis() - lastUpdateTime.get() > RESOLVER_TIME_OUT) {
|
||||
for (WeakIdentityHashMap<?, ?> solver : cachedResolvers.values()) {
|
||||
solver.clear();
|
||||
}
|
||||
lastUpdateTime.set(System.currentTimeMillis());
|
||||
}
|
||||
} finally {
|
||||
cleanResolverLock.writeLock().unlock();
|
||||
}
|
||||
}, RESOLVER_TIME_OUT, RESOLVER_TIME_OUT, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public HudiJniScanner(int fetchSize, Map<String, String> params) {
|
||||
debugString = params.entrySet().stream().map(kv -> kv.getKey() + "=" + kv.getValue())
|
||||
.collect(Collectors.joining("\n"));
|
||||
@ -84,46 +144,62 @@ public class HudiJniScanner extends JniScanner {
|
||||
|
||||
@Override
|
||||
public void open() throws IOException {
|
||||
Thread.currentThread().setContextClassLoader(classLoader);
|
||||
initTableInfo(split.requiredTypes(), split.requiredFields(), predicates, fetchSize);
|
||||
long startTime = System.nanoTime();
|
||||
// RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck,
|
||||
// so use another process to kill this stuck process.
|
||||
// TODO(gaoxin): better way to solve the stuck process?
|
||||
AtomicBoolean isKilled = new AtomicBoolean(false);
|
||||
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
|
||||
executorService.scheduleAtFixedRate(() -> {
|
||||
if (!isKilled.get()) {
|
||||
synchronized (HudiJniScanner.class) {
|
||||
List<Long> pids = Utils.getChildProcessIds(
|
||||
Utils.getCurrentProcId());
|
||||
for (long pid : pids) {
|
||||
String cmd = Utils.getCommandLine(pid);
|
||||
if (cmd != null && cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) {
|
||||
Utils.killProcess(pid);
|
||||
isKilled.set(true);
|
||||
LOG.info("Kill hotspot debugger process " + pid);
|
||||
Future<?> avroFuture = avroReadPool.submit(() -> {
|
||||
Thread.currentThread().setContextClassLoader(classLoader);
|
||||
initTableInfo(split.requiredTypes(), split.requiredFields(), predicates, fetchSize);
|
||||
long startTime = System.nanoTime();
|
||||
// RecordReader will use ProcessBuilder to start a hotspot process, which may be stuck,
|
||||
// so use another process to kill this stuck process.
|
||||
// TODO(gaoxin): better way to solve the stuck process?
|
||||
AtomicBoolean isKilled = new AtomicBoolean(false);
|
||||
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
|
||||
executorService.scheduleAtFixedRate(() -> {
|
||||
if (!isKilled.get()) {
|
||||
synchronized (HudiJniScanner.class) {
|
||||
List<Long> pids = Utils.getChildProcessIds(
|
||||
Utils.getCurrentProcId());
|
||||
for (long pid : pids) {
|
||||
String cmd = Utils.getCommandLine(pid);
|
||||
if (cmd != null && cmd.contains("org.openjdk.jol.vm.sa.AttachMain")) {
|
||||
Utils.killProcess(pid);
|
||||
isKilled.set(true);
|
||||
LOG.info("Kill hotspot debugger process " + pid);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}, 100, 1000, TimeUnit.MILLISECONDS);
|
||||
|
||||
cleanResolverLock.readLock().lock();
|
||||
try {
|
||||
lastUpdateTime.set(System.currentTimeMillis());
|
||||
if (ugi != null) {
|
||||
recordIterator = ugi.doAs(
|
||||
(PrivilegedExceptionAction<Iterator<InternalRow>>) () -> new MORSnapshotSplitReader(
|
||||
split).buildScanIterator(split.requiredFields(), new Filter[0]));
|
||||
} else {
|
||||
recordIterator = new MORSnapshotSplitReader(split)
|
||||
.buildScanIterator(split.requiredFields(), new Filter[0]);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to open hudi scanner, split params:\n" + debugString, e);
|
||||
throw new RuntimeException(e.getMessage(), e);
|
||||
} finally {
|
||||
cleanResolverLock.readLock().unlock();
|
||||
}
|
||||
}, 100, 1000, TimeUnit.MILLISECONDS);
|
||||
isKilled.set(true);
|
||||
executorService.shutdownNow();
|
||||
if (AVRO_RESOLVER_CACHE != null && AVRO_RESOLVER_CACHE.get() != null) {
|
||||
cachedResolvers.computeIfAbsent(Thread.currentThread().getId(),
|
||||
threadId -> AVRO_RESOLVER_CACHE.get());
|
||||
}
|
||||
getRecordReaderTimeNs += System.nanoTime() - startTime;
|
||||
});
|
||||
try {
|
||||
if (ugi != null) {
|
||||
recordIterator = ugi.doAs(
|
||||
(PrivilegedExceptionAction<Iterator<InternalRow>>) () -> new MORSnapshotSplitReader(
|
||||
split).buildScanIterator(split.requiredFields(), new Filter[0]));
|
||||
} else {
|
||||
recordIterator = new MORSnapshotSplitReader(split)
|
||||
.buildScanIterator(split.requiredFields(), new Filter[0]);
|
||||
}
|
||||
avroFuture.get();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to open hudi scanner, split params:\n" + debugString, e);
|
||||
throw new IOException(e.getMessage(), e);
|
||||
}
|
||||
isKilled.set(true);
|
||||
executorService.shutdownNow();
|
||||
getRecordReaderTimeNs += System.nanoTime() - startTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -131,6 +207,7 @@ public class HudiJniScanner extends JniScanner {
|
||||
if (recordIterator instanceof Closeable) {
|
||||
((Closeable) recordIterator).close();
|
||||
}
|
||||
recordIterator = null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -153,7 +153,6 @@ case class HoodieTableInformation(sparkSession: SparkSession,
|
||||
metaClient: HoodieTableMetaClient,
|
||||
timeline: HoodieTimeline,
|
||||
tableConfig: HoodieTableConfig,
|
||||
tableAvroSchema: Schema,
|
||||
internalSchemaOpt: Option[InternalSchema])
|
||||
|
||||
/**
|
||||
@ -215,7 +214,22 @@ abstract class BaseSplitReader(val split: HoodieSplit) {
|
||||
* required to fetch table's Avro and Internal schemas
|
||||
*/
|
||||
protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
|
||||
(tableInformation.tableAvroSchema, tableInformation.internalSchemaOpt)
|
||||
val schemaResolver = new TableSchemaResolver(tableInformation.metaClient)
|
||||
val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
|
||||
val avroSchema: Schema = tableInformation.internalSchemaOpt.map { is =>
|
||||
AvroInternalSchemaConverter.convert(is, namespace + "." + name)
|
||||
} orElse {
|
||||
specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
|
||||
} orElse {
|
||||
split.schemaSpec.map(s => convertToAvroSchema(s, tableName))
|
||||
} getOrElse {
|
||||
Try(schemaResolver.getTableAvroSchema) match {
|
||||
case Success(schema) => schema
|
||||
case Failure(e) =>
|
||||
throw new HoodieSchemaException("Failed to fetch schema from the table", e)
|
||||
}
|
||||
}
|
||||
(avroSchema, tableInformation.internalSchemaOpt)
|
||||
}
|
||||
|
||||
protected lazy val tableStructSchema: StructType = convertAvroSchemaToStructType(tableAvroSchema)
|
||||
@ -649,27 +663,11 @@ object BaseSplitReader {
|
||||
None
|
||||
}
|
||||
}
|
||||
val tableName = metaClient.getTableConfig.getTableName
|
||||
val (name, namespace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
|
||||
val avroSchema: Schema = internalSchemaOpt.map { is =>
|
||||
AvroInternalSchemaConverter.convert(is, namespace + "." + name)
|
||||
} orElse {
|
||||
specifiedQueryTimestamp.map(schemaResolver.getTableAvroSchema)
|
||||
} orElse {
|
||||
split.schemaSpec.map(s => convertToAvroSchema(s, tableName))
|
||||
} getOrElse {
|
||||
Try(schemaResolver.getTableAvroSchema) match {
|
||||
case Success(schema) => schema
|
||||
case Failure(e) =>
|
||||
throw new HoodieSchemaException("Failed to fetch schema from the table", e)
|
||||
}
|
||||
}
|
||||
|
||||
HoodieTableInformation(sparkSession,
|
||||
metaClient,
|
||||
timeline,
|
||||
metaClient.getTableConfig,
|
||||
avroSchema,
|
||||
internalSchemaOpt)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user