Files
doris/be/src/vec/exec/scan/hudi_jni_reader.cpp
Ashin Gau 9a83d78dfe [Enhancement](hudi) support hudi mor table, step2 follow #19909 (#20570)
PR(https://github.com/apache/doris/pull/19909) has implemented the framework of hudi reader for MOR table. This PR completes all functions of reading MOR table and enables end-to-end queries.
Key Implementations:
1. Use hudi meta information to generate the table schema, not from hive client.
2. Use hive client to list hudi partitions, so it strongly depends the sync-tools(https://hudi.apache.org/docs/syncing_metastore/) which syncs the partitions of hudi into hive metastore. However, we may get the hudi partitions directly from .hoodie directory.
3. Remove `HudiHMSExternalCatalog`, because other catalogs like glue is compatible with hive catalog.
4. Read the COW table originally from c++.
5. Hudi RecordReader will use ProcessBuilder to start a hotspot debugger process, which may be stuck when attaching the origin JNI process, soI use a tricky method to kill this useless process.
2023-06-10 12:25:53 +08:00

98 lines
3.6 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "hudi_jni_reader.h"
#include <map>
#include <ostream>
#include "runtime/descriptors.h"
#include "runtime/types.h"
#include "vec/core/types.h"
namespace doris {
class RuntimeProfile;
class RuntimeState;
namespace vectorized {
class Block;
} // namespace vectorized
} // namespace doris
namespace doris::vectorized {
const std::string HudiJniReader::HADOOP_FS_PREFIX = "hadoop_fs.";
HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params,
const THudiFileDesc& hudi_params,
const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile)
: _scan_params(scan_params),
_hudi_params(hudi_params),
_file_slot_descs(file_slot_descs),
_state(state),
_profile(profile) {
std::vector<std::string> required_fields;
for (auto& desc : _file_slot_descs) {
required_fields.emplace_back(desc->col_name());
}
std::map<String, String> params = {
{"base_path", _hudi_params.base_path},
{"data_file_path", _hudi_params.data_file_path},
{"data_file_length", std::to_string(_hudi_params.data_file_length)},
{"delta_file_paths", join(_hudi_params.delta_logs, ",")},
{"hudi_column_names", join(_hudi_params.column_names, ",")},
{"hudi_column_types", join(_hudi_params.column_types, "#")},
{"required_fields", join(required_fields, ",")},
{"instant_time", _hudi_params.instant_time},
{"serde", _hudi_params.serde},
{"input_format", _hudi_params.input_format}};
// Use compatible hadoop client to read data
for (auto& kv : _scan_params.properties) {
params[HADOOP_FS_PREFIX + kv.first] = kv.second;
}
_jni_connector = std::make_unique<JniConnector>("org/apache/doris/hudi/HudiJniScanner", params,
required_fields);
}
Status HudiJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(_jni_connector->get_nex_block(block, read_rows, eof));
if (*eof) {
RETURN_IF_ERROR(_jni_connector->close());
}
return Status::OK();
}
Status HudiJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
for (auto& desc : _file_slot_descs) {
name_to_type->emplace(desc->col_name(), desc->type());
}
return Status::OK();
}
Status HudiJniReader::init_reader(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
_colname_to_value_range = colname_to_value_range;
RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
return _jni_connector->open(_state, _profile);
}
} // namespace doris::vectorized