[feature](jni) add jni metrics and attach to BE profile automatically (#21004)

Add JNI metrics, for example:
```
-  HudiJniScanner:  0ns
  -  FillBlockTime:  31.29ms
  -  GetRecordReaderTime:  1m5s
  -  JavaScanTime:  35s991ms
  -  OpenScannerTime:  1m6s
```
Add three common performance metrics for JNI scanner:
1. `OpenScannerTime`: Time to init and open JNI scanner
2. `JavaScanTime`: Time to scan data and insert into vector table in java side
3. `FillBlockTime`: Time to convert java vector table to c++ block

And support user defined metrics in java side, for example: `OpenScannerTime` is a long time for the open process, we want to determine which sub-process takes too much time, so we add `GetRecordReaderTime` in java side.
The user defined metrics in java side can be attached to BE profile automatically.
This commit is contained in:
Ashin Gau
2023-06-21 11:19:02 +08:00
committed by GitHub
parent b4773e1195
commit ef17289925
6 changed files with 174 additions and 23 deletions

View File

@ -243,6 +243,87 @@ Status JniUtil::GetJniExceptionMsg(JNIEnv* env, bool log_stack, const string& pr
return Status::InternalError("{}{}", prefix, msg_str_guard.get());
}
jobject JniUtil::convert_to_java_map(JNIEnv* env, const std::map<std::string, std::string>& map) {
jclass hashmap_class = env->FindClass("java/util/HashMap");
jmethodID hashmap_constructor = env->GetMethodID(hashmap_class, "<init>", "(I)V");
jobject hashmap_object = env->NewObject(hashmap_class, hashmap_constructor, map.size());
jmethodID hashmap_put = env->GetMethodID(
hashmap_class, "put", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;");
for (const auto& it : map) {
jstring key = env->NewStringUTF(it.first.c_str());
jstring value = env->NewStringUTF(it.second.c_str());
env->CallObjectMethod(hashmap_object, hashmap_put, key, value);
env->DeleteLocalRef(key);
env->DeleteLocalRef(value);
}
env->DeleteLocalRef(hashmap_class);
return hashmap_object;
}
std::map<std::string, std::string> JniUtil::convert_to_cpp_map(JNIEnv* env, jobject map) {
std::map<std::string, std::string> resultMap;
// Get the class and method ID of the java.util.Map interface
jclass mapClass = env->FindClass("java/util/Map");
jmethodID entrySetMethod = env->GetMethodID(mapClass, "entrySet", "()Ljava/util/Set;");
// Get the class and method ID of the java.util.Set interface
jclass setClass = env->FindClass("java/util/Set");
jmethodID iteratorSetMethod = env->GetMethodID(setClass, "iterator", "()Ljava/util/Iterator;");
// Get the class and method ID of the java.util.Iterator interface
jclass iteratorClass = env->FindClass("java/util/Iterator");
jmethodID hasNextMethod = env->GetMethodID(iteratorClass, "hasNext", "()Z");
jmethodID nextMethod = env->GetMethodID(iteratorClass, "next", "()Ljava/lang/Object;");
// Get the class and method ID of the java.util.Map.Entry interface
jclass entryClass = env->FindClass("java/util/Map$Entry");
jmethodID getKeyMethod = env->GetMethodID(entryClass, "getKey", "()Ljava/lang/Object;");
jmethodID getValueMethod = env->GetMethodID(entryClass, "getValue", "()Ljava/lang/Object;");
// Call the entrySet method to get the set of key-value pairs
jobject entrySet = env->CallObjectMethod(map, entrySetMethod);
// Call the iterator method on the set to iterate over the key-value pairs
jobject iteratorSet = env->CallObjectMethod(entrySet, iteratorSetMethod);
// Iterate over the key-value pairs
while (env->CallBooleanMethod(iteratorSet, hasNextMethod)) {
// Get the current entry
jobject entry = env->CallObjectMethod(iteratorSet, nextMethod);
// Get the key and value from the entry
jobject javaKey = env->CallObjectMethod(entry, getKeyMethod);
jobject javaValue = env->CallObjectMethod(entry, getValueMethod);
// Convert the key and value to C++ strings
const char* key = env->GetStringUTFChars(static_cast<jstring>(javaKey), nullptr);
const char* value = env->GetStringUTFChars(static_cast<jstring>(javaValue), nullptr);
// Store the key-value pair in the map
resultMap[key] = value;
// Release the string references
env->ReleaseStringUTFChars(static_cast<jstring>(javaKey), key);
env->ReleaseStringUTFChars(static_cast<jstring>(javaValue), value);
// Delete local references
env->DeleteLocalRef(entry);
env->DeleteLocalRef(javaKey);
env->DeleteLocalRef(javaValue);
}
// Delete local references
env->DeleteLocalRef(iteratorSet);
env->DeleteLocalRef(entrySet);
env->DeleteLocalRef(mapClass);
env->DeleteLocalRef(setClass);
env->DeleteLocalRef(iteratorClass);
env->DeleteLocalRef(entryClass);
return resultMap;
}
Status JniUtil::GetGlobalClassRef(JNIEnv* env, const char* class_str, jclass* class_ref) {
*class_ref = NULL;
jclass local_cl = env->FindClass(class_str);

View File

@ -75,6 +75,9 @@ public:
return INITIAL_RESERVED_BUFFER_SIZE << n;
}
static jobject convert_to_java_map(JNIEnv* env, const std::map<std::string, std::string>& map);
static std::map<std::string, std::string> convert_to_cpp_map(JNIEnv* env, jobject map);
private:
static Status GetJNIEnvSlowPath(JNIEnv** env);

View File

@ -63,6 +63,12 @@ JniConnector::~JniConnector() {
}
Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
_state = state;
_profile = profile;
ADD_TIMER(_profile, _connector_name.c_str());
_open_scanner_time = ADD_CHILD_TIMER(_profile, "OpenScannerTime", _connector_name.c_str());
_java_scan_time = ADD_CHILD_TIMER(_profile, "JavaScanTime", _connector_name.c_str());
_fill_block_time = ADD_CHILD_TIMER(_profile, "FillBlockTime", _connector_name.c_str());
// cannot put the env into fields, because frames in an env object is limited
// to avoid limited frames in a thread, we should get local env in a method instead of in whole object.
JNIEnv* env = nullptr;
@ -70,6 +76,7 @@ Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) {
if (env == nullptr) {
return Status::InternalError("Failed to get/create JVM");
}
SCOPED_TIMER(_open_scanner_time);
RETURN_IF_ERROR(_init_jni_scanner(env, state->batch_size()));
// Call org.apache.doris.common.jni.JniScanner#open
env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open);
@ -94,7 +101,11 @@ Status JniConnector::get_nex_block(Block* block, size_t* read_rows, bool* eof) {
// return the address of meta information
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
long meta_address = env->CallLongMethod(_jni_scanner_obj, _jni_scanner_get_next_batch);
long meta_address = 0;
{
SCOPED_TIMER(_java_scan_time);
meta_address = env->CallLongMethod(_jni_scanner_obj, _jni_scanner_get_next_batch);
}
RETURN_ERROR_IF_EXC(env);
if (meta_address == 0) {
// Address == 0 when there's no data in scanner
@ -118,10 +129,43 @@ Status JniConnector::get_nex_block(Block* block, size_t* read_rows, bool* eof) {
return Status::OK();
}
std::map<std::string, std::string> JniConnector::get_statistics(JNIEnv* env) {
jobject metrics = env->CallObjectMethod(_jni_scanner_obj, _jni_scanner_get_statistics);
std::map<std::string, std::string> result = JniUtil::convert_to_cpp_map(env, metrics);
env->DeleteLocalRef(metrics);
return result;
}
Status JniConnector::close() {
if (!_closed) {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
// update scanner metrics
for (const auto& metric : get_statistics(env)) {
std::vector<std::string> type_and_name = split(metric.first, ":");
if (type_and_name.size() != 2) {
LOG(WARNING) << "Name of JNI Scanner metric should be pattern like "
<< "'metricType:metricName'";
continue;
}
long metric_value = std::stol(metric.second);
RuntimeProfile::Counter* scanner_counter;
if (type_and_name[0] == "timer") {
scanner_counter =
ADD_CHILD_TIMER(_profile, type_and_name[1], _connector_name.c_str());
} else if (type_and_name[0] == "counter") {
scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::UNIT,
_connector_name.c_str());
} else if (type_and_name[0] == "bytes") {
scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::BYTES,
_connector_name.c_str());
} else {
LOG(WARNING) << "Type of JNI Scanner metric should be timer, counter or bytes";
continue;
}
COUNTER_UPDATE(scanner_counter, metric_value);
}
// _fill_block may be failed and returned, we should release table in close.
// org.apache.doris.common.jni.JniScanner#releaseTable is idempotent
env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
@ -145,41 +189,27 @@ Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) {
RETURN_ERROR_IF_EXC(env);
// prepare constructor parameters
jclass hashmap_class = env->FindClass("java/util/HashMap");
jmethodID hashmap_constructor = env->GetMethodID(hashmap_class, "<init>", "(I)V");
jobject hashmap_object =
env->NewObject(hashmap_class, hashmap_constructor, _scanner_params.size());
jmethodID hashmap_put = env->GetMethodID(
hashmap_class, "put", "(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;");
RETURN_ERROR_IF_EXC(env);
for (const auto& it : _scanner_params) {
jstring key = env->NewStringUTF(it.first.c_str());
jstring value = env->NewStringUTF(it.second.c_str());
env->CallObjectMethod(hashmap_object, hashmap_put, key, value);
env->DeleteLocalRef(key);
env->DeleteLocalRef(value);
}
env->DeleteLocalRef(hashmap_class);
_jni_scanner_obj =
jobject hashmap_object = JniUtil::convert_to_java_map(env, _scanner_params);
jobject jni_scanner_obj =
env->NewObject(_jni_scanner_cls, scanner_constructor, batch_size, hashmap_object);
env->DeleteLocalRef(hashmap_object);
RETURN_ERROR_IF_EXC(env);
_jni_scanner_open = env->GetMethodID(_jni_scanner_cls, "open", "()V");
RETURN_ERROR_IF_EXC(env);
_jni_scanner_get_next_batch = env->GetMethodID(_jni_scanner_cls, "getNextBatchMeta", "()J");
RETURN_ERROR_IF_EXC(env);
_jni_scanner_close = env->GetMethodID(_jni_scanner_cls, "close", "()V");
RETURN_ERROR_IF_EXC(env);
_jni_scanner_release_column = env->GetMethodID(_jni_scanner_cls, "releaseColumn", "(I)V");
RETURN_ERROR_IF_EXC(env);
_jni_scanner_release_table = env->GetMethodID(_jni_scanner_cls, "releaseTable", "()V");
_jni_scanner_get_statistics =
env->GetMethodID(_jni_scanner_cls, "getStatistics", "()Ljava/util/Map;");
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_scanner_obj, &_jni_scanner_obj));
env->DeleteLocalRef(jni_scanner_obj);
RETURN_ERROR_IF_EXC(env);
RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, _jni_scanner_obj, &_jni_scanner_obj));
return Status::OK();
}
Status JniConnector::_fill_block(Block* block, size_t num_rows) {
SCOPED_TIMER(_fill_block_time);
for (int i = 0; i < _column_names.size(); ++i) {
auto& column_with_type_and_name = block->get_by_name(_column_names[i]);
auto& column_ptr = column_with_type_and_name.column;

View File

@ -163,7 +163,10 @@ public:
std::vector<std::string> column_names)
: _connector_class(std::move(connector_class)),
_scanner_params(std::move(scanner_params)),
_column_names(std::move(column_names)) {}
_column_names(std::move(column_names)) {
// Use java class name as connector name
_connector_name = split(_connector_class, "/").back();
}
/// Should release jni resources if other functions are failed.
~JniConnector();
@ -197,6 +200,11 @@ public:
*/
Status get_nex_block(Block* block, size_t* read_rows, bool* eof);
/**
* Get performance metrics from java scanner
*/
std::map<std::string, std::string> get_statistics(JNIEnv* env);
/**
* Close scanner and release jni resources.
*/
@ -210,10 +218,18 @@ public:
static Status generate_meta_info(Block* block, std::unique_ptr<long[]>& meta);
private:
std::string _connector_name;
std::string _connector_class;
std::map<std::string, std::string> _scanner_params;
std::vector<std::string> _column_names;
RuntimeState* _state;
RuntimeProfile* _profile;
RuntimeProfile::Counter* _open_scanner_time;
RuntimeProfile::Counter* _java_scan_time;
RuntimeProfile::Counter* _fill_block_time;
std::map<std::string, RuntimeProfile::Counter*> _scanner_profile;
size_t _has_read = 0;
bool _closed = false;
@ -224,6 +240,7 @@ private:
jmethodID _jni_scanner_close;
jmethodID _jni_scanner_release_column;
jmethodID _jni_scanner_release_table;
jmethodID _jni_scanner_get_statistics;
long* _meta_ptr;
int _meta_index;

View File

@ -41,6 +41,7 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -63,6 +64,8 @@ public class HudiJniScanner extends JniScanner {
private Deserializer deserializer;
private final ClassLoader classLoader;
private long getRecordReaderTimeNs = 0;
public HudiJniScanner(int fetchSize, Map<String, String> params) {
if (LOG.isDebugEnabled()) {
LOG.debug("Hudi JNI params:\n" + params.entrySet().stream().map(kv -> kv.getKey() + "=" + kv.getValue())
@ -182,6 +185,7 @@ public class HudiJniScanner extends JniScanner {
}
}, 100, 1000, TimeUnit.MILLISECONDS);
long startTime = System.nanoTime();
if (ugi != null) {
reader = ugi.doAs((PrivilegedExceptionAction<RecordReader<NullWritable, ArrayWritable>>) () -> {
RecordReader<NullWritable, ArrayWritable> ugiReader
@ -193,6 +197,7 @@ public class HudiJniScanner extends JniScanner {
reader = (RecordReader<NullWritable, ArrayWritable>) inputFormatClass
.getRecordReader(hudiSplit, jobConf, Reporter.NULL);
}
getRecordReaderTimeNs += System.nanoTime() - startTime;
isKilled.set(true);
executorService.shutdownNow();
@ -207,4 +212,8 @@ public class HudiJniScanner extends JniScanner {
}
}
@Override
public Map<String, String> getStatistics() {
return Collections.singletonMap("timer:GetRecordReaderTime", String.valueOf(getRecordReaderTimeNs));
}
}

View File

@ -24,6 +24,8 @@ import org.apache.doris.common.jni.vec.ScanPredicate;
import org.apache.doris.common.jni.vec.VectorTable;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
public abstract class JniScanner {
protected VectorTable vectorTable;
@ -79,6 +81,15 @@ public abstract class JniScanner {
return getMetaAddress(numRows);
}
/**
* Get performance metrics. The key should be pattern like "metricType:metricName".
* Support three metric types: timer, counter and bytes.
* The c++ side will attach metricName into profile automatically.
*/
public Map<String, String> getStatistics() {
return Collections.emptyMap();
}
private long getMetaAddress(int numRows) {
vectorTable.setNumRows(numRows);
return vectorTable.getMetaAddress();