branch-2.1: [Fix](catalog)Fixes query failures for Paimon tables stored in Kerberized HDFS #47192 (#47694)

Cherry-picked from #47192

Co-authored-by: Calvin Kirs <guoqiang@selectdb.com>
This commit is contained in:
github-actions[bot]
2025-02-12 09:16:30 +08:00
committed by GitHub
parent b4f03c0eb3
commit b4af671347
5 changed files with 159 additions and 36 deletions

View File

@ -20,6 +20,8 @@ package org.apache.doris.hudi;
import org.apache.doris.common.classloader.ThreadClassLoaderContext;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@ -92,6 +94,8 @@ public class HadoopHudiJniScanner extends JniScanner {
private final int fetchSize;
private final ClassLoader classLoader;
private final PreExecutionAuthenticator preExecutionAuthenticator;
public HadoopHudiJniScanner(int fetchSize, Map<String, String> params) {
this.basePath = params.get("base_path");
this.dataFilePath = params.get("data_file_path");
@ -120,6 +124,7 @@ public class HadoopHudiJniScanner extends JniScanner {
LOG.debug("get hudi params {}: {}", entry.getKey(), entry.getValue());
}
}
this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(fsOptionsProps);
ZoneId zoneId;
if (Strings.isNullOrEmpty(params.get("time_zone"))) {
@ -135,10 +140,14 @@ public class HadoopHudiJniScanner extends JniScanner {
@Override
public void open() throws IOException {
try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) {
initRequiredColumnsAndTypes();
initTableInfo(requiredTypes, requiredFields, fetchSize);
Properties properties = getReaderProperties();
initReader(properties);
preExecutionAuthenticator.execute(() -> {
initRequiredColumnsAndTypes();
initTableInfo(requiredTypes, requiredFields, fetchSize);
Properties properties = getReaderProperties();
initReader(properties);
return null;
});
} catch (Exception e) {
close();
LOG.warn("failed to open hadoop hudi jni scanner", e);
@ -149,25 +158,27 @@ public class HadoopHudiJniScanner extends JniScanner {
@Override
public int getNext() throws IOException {
try (ThreadClassLoaderContext ignored = new ThreadClassLoaderContext(classLoader)) {
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
int numRows = 0;
for (; numRows < fetchSize; numRows++) {
if (!reader.next(key, value)) {
break;
return preExecutionAuthenticator.execute(() -> {
NullWritable key = reader.createKey();
ArrayWritable value = reader.createValue();
int numRows = 0;
for (; numRows < fetchSize; numRows++) {
if (!reader.next(key, value)) {
break;
}
Object rowData = deserializer.deserialize(value);
for (int i = 0; i < fields.length; i++) {
Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]);
columnValue.setRow(fieldData);
// LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}",
// numRows, i, types[i].getName(), types[i].getType().name(),
// fieldInspectors[i].getTypeName());
columnValue.setField(types[i], fieldInspectors[i]);
appendData(i, columnValue);
}
}
Object rowData = deserializer.deserialize(value);
for (int i = 0; i < fields.length; i++) {
Object fieldData = rowInspector.getStructFieldData(rowData, structFields[i]);
columnValue.setRow(fieldData);
// LOG.info("rows: {}, column: {}, col name: {}, col type: {}, inspector: {}",
// numRows, i, types[i].getName(), types[i].getType().name(),
// fieldInspectors[i].getTypeName());
columnValue.setField(types[i], fieldInspectors[i]);
appendData(i, columnValue);
}
}
return numRows;
return numRows;
});
} catch (Exception e) {
close();
LOG.warn("failed to get next in hadoop hudi jni scanner", e);

View File

@ -39,12 +39,6 @@ under the License.
<groupId>org.apache.doris</groupId>
<artifactId>java-common</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
<artifactId>fe-common</artifactId>
<groupId>org.apache.doris</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>

View File

@ -20,6 +20,8 @@ package org.apache.doris.paimon;
import org.apache.doris.common.jni.JniScanner;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.TableSchema;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
import org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
import org.apache.doris.paimon.PaimonTableCache.PaimonTableCacheKey;
import org.apache.doris.paimon.PaimonTableCache.TableExt;
@ -74,6 +76,7 @@ public class PaimonJniScanner extends JniScanner {
private long lastUpdateTime;
private RecordReader.RecordIterator<InternalRow> recordIterator = null;
private final ClassLoader classLoader;
private PreExecutionAuthenticator preExecutionAuthenticator;
public PaimonJniScanner(int batchSize, Map<String, String> params) {
this.classLoader = this.getClass().getClassLoader();
@ -104,6 +107,7 @@ public class PaimonJniScanner extends JniScanner {
.filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
.collect(Collectors
.toMap(kv1 -> kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
this.preExecutionAuthenticator = PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams);
}
@Override
@ -114,12 +118,16 @@ public class PaimonJniScanner extends JniScanner {
// `Thread.currentThread().getContextClassLoader().getResource(HIVE_SITE_FILE)`
// so we need to provide a classloader, otherwise it will cause NPE.
Thread.currentThread().setContextClassLoader(classLoader);
initTable();
initReader();
preExecutionAuthenticator.execute(() -> {
initTable();
initReader();
return null;
});
resetDatetimeV2Precision();
} catch (Throwable e) {
LOG.warn("Failed to open paimon_scanner: " + e.getMessage(), e);
throw e;
throw new RuntimeException(e);
}
}
@ -137,7 +145,7 @@ public class PaimonJniScanner extends JniScanner {
readBuilder.withFilter(getPredicates());
reader = readBuilder.newRead().executeFilter().createReader(getSplit());
paimonDataTypeList =
Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList());
Arrays.stream(projected).mapToObj(i -> table.rowType().getTypeAt(i)).collect(Collectors.toList());
}
private int[] getProjected() {
@ -183,8 +191,7 @@ public class PaimonJniScanner extends JniScanner {
}
}
@Override
protected int getNext() throws IOException {
private int readAndProcessNextBatch() throws IOException {
int rows = 0;
try {
if (recordIterator == null) {
@ -210,13 +217,22 @@ public class PaimonJniScanner extends JniScanner {
} catch (Exception e) {
close();
LOG.warn("Failed to get the next batch of paimon. "
+ "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}",
+ "split: {}, requiredFieldNames: {}, paimonAllFieldNames: {}, dataType: {}",
getSplit(), params.get("required_fields"), paimonAllFieldNames, paimonDataTypeList, e);
throw new IOException(e);
}
return rows;
}
@Override
protected int getNext() {
try {
return preExecutionAuthenticator.execute(this::readAndProcessNextBatch);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
protected TableSchema parseTableSchema() throws UnsupportedOperationException {
// do nothing

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
public abstract class AuthenticationConfig {
private static final Logger LOG = LogManager.getLogger(AuthenticationConfig.class);
public static String HADOOP_USER_NAME = "hadoop.username";
@ -31,12 +33,24 @@ public abstract class AuthenticationConfig {
public static String HIVE_KERBEROS_PRINCIPAL = "hive.metastore.kerberos.principal";
public static String HIVE_KERBEROS_KEYTAB = "hive.metastore.kerberos.keytab.file";
public static String DORIS_KRB5_DEBUG = "doris.krb5.debug";
private static final String DEFAULT_HADOOP_USERNAME = "hadoop";
/**
* @return true if the config is valid, otherwise false.
*/
public abstract boolean isValid();
protected static String generalAuthenticationConfigKey(Map<String, String> conf) {
String authentication = conf.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
null);
if (AuthType.KERBEROS.getDesc().equals(authentication)) {
return conf.get(HADOOP_KERBEROS_PRINCIPAL) + "-" + conf.get(HADOOP_KERBEROS_KEYTAB) + "-"
+ conf.getOrDefault(DORIS_KRB5_DEBUG, "false");
} else {
return conf.getOrDefault(HADOOP_USER_NAME, DEFAULT_HADOOP_USERNAME);
}
}
/**
* get kerberos config from hadoop conf
* @param conf config
@ -90,7 +104,8 @@ public abstract class AuthenticationConfig {
private static AuthenticationConfig createSimpleAuthenticationConfig(Configuration conf) {
// AuthType.SIMPLE
SimpleAuthenticationConfig simpleAuthenticationConfig = new SimpleAuthenticationConfig();
simpleAuthenticationConfig.setUsername(conf.get(HADOOP_USER_NAME));
String hadoopUserName = conf.get(HADOOP_USER_NAME, DEFAULT_HADOOP_USERNAME);
simpleAuthenticationConfig.setUsername(hadoopUserName);
return simpleAuthenticationConfig;
}
}

View File

@ -0,0 +1,87 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.security.authentication;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
* A cache class for storing and retrieving PreExecutionAuthenticator instances based on Hadoop configurations.
* This class caches PreExecutionAuthenticator objects to avoid recreating them for the same Hadoop configuration.
* It uses a Least Recently Used (LRU) cache, where the least recently used entries are removed when the cache exceeds
* the maximum size (MAX_CACHE_SIZE).
* <p>
* The purpose of this class is to ensure that for identical Hadoop configurations (key-value pairs),
* only one PreExecutionAuthenticator instance is created and reused, optimizing performance by reducing
* redundant instantiations.
*/
public class PreExecutionAuthenticatorCache {
private static final Logger LOG = LogManager.getLogger(PreExecutionAuthenticatorCache.class);
private static final int MAX_CACHE_SIZE = 100;
private static final Cache<String, PreExecutionAuthenticator> preExecutionAuthenticatorCache =
CacheBuilder.newBuilder()
.maximumSize(MAX_CACHE_SIZE)
.expireAfterAccess(60 * 24, TimeUnit.MINUTES)
.build();
/**
* Retrieves a PreExecutionAuthenticator instance from the cache or creates a new one if it doesn't exist.
* This method first checks if the configuration is already cached. If not, it computes a new instance and
* caches it for future use.
*
* @param hadoopConfig The Hadoop configuration (key-value pairs)
* @return A PreExecutionAuthenticator instance for the given configuration
*/
public static PreExecutionAuthenticator getAuthenticator(Map<String, String> hadoopConfig) {
String authenticatorCacheKey = AuthenticationConfig.generalAuthenticationConfigKey(hadoopConfig);
PreExecutionAuthenticator authenticator;
try {
authenticator = preExecutionAuthenticatorCache.get(authenticatorCacheKey,
() -> createAuthenticator(hadoopConfig, authenticatorCacheKey));
} catch (ExecutionException exception) {
throw new RuntimeException("Failed to create PreExecutionAuthenticator for key: " + authenticatorCacheKey,
exception);
}
return authenticator;
}
private static PreExecutionAuthenticator createAuthenticator(Map<String, String> hadoopConfig,
String authenticatorCacheKey) {
Configuration conf = new Configuration();
hadoopConfig.forEach(conf::set);
PreExecutionAuthenticator preExecutionAuthenticator = new PreExecutionAuthenticator();
AuthenticationConfig authenticationConfig = AuthenticationConfig.getKerberosConfig(
conf, AuthenticationConfig.HADOOP_KERBEROS_PRINCIPAL,
AuthenticationConfig.HADOOP_KERBEROS_KEYTAB);
HadoopAuthenticator hadoopAuthenticator = HadoopAuthenticator
.getHadoopAuthenticator(authenticationConfig);
preExecutionAuthenticator.setHadoopAuthenticator(hadoopAuthenticator);
LOG.info("Creating new PreExecutionAuthenticator for configuration, Cache key: {}",
authenticatorCacheKey);
return preExecutionAuthenticator;
}
}