[fix](multi-catalog) Fix multi-thread issue in hive/iceberg writer commit meta-info to fe. (#49920)
This commit is contained in:
@ -583,37 +583,29 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
|
||||
}
|
||||
}
|
||||
|
||||
if (!req.runtime_state->hive_partition_updates().empty()) {
|
||||
if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
|
||||
params.__isset.hive_partition_updates = true;
|
||||
params.hive_partition_updates.reserve(
|
||||
req.runtime_state->hive_partition_updates().size());
|
||||
for (auto& hive_partition_update : req.runtime_state->hive_partition_updates()) {
|
||||
params.hive_partition_updates.push_back(hive_partition_update);
|
||||
}
|
||||
params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
|
||||
hpu.end());
|
||||
} else if (!req.runtime_states.empty()) {
|
||||
for (auto* rs : req.runtime_states) {
|
||||
if (!rs->hive_partition_updates().empty()) {
|
||||
if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
|
||||
params.__isset.hive_partition_updates = true;
|
||||
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
|
||||
rs->hive_partition_updates().begin(),
|
||||
rs->hive_partition_updates().end());
|
||||
rs_hpu.begin(), rs_hpu.end());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!req.runtime_state->iceberg_commit_datas().empty()) {
|
||||
if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
|
||||
params.__isset.iceberg_commit_datas = true;
|
||||
params.iceberg_commit_datas.reserve(req.runtime_state->iceberg_commit_datas().size());
|
||||
for (auto& iceberg_commit_data : req.runtime_state->iceberg_commit_datas()) {
|
||||
params.iceberg_commit_datas.push_back(iceberg_commit_data);
|
||||
}
|
||||
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
|
||||
icd.end());
|
||||
} else if (!req.runtime_states.empty()) {
|
||||
for (auto* rs : req.runtime_states) {
|
||||
if (!rs->iceberg_commit_datas().empty()) {
|
||||
if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
|
||||
params.__isset.iceberg_commit_datas = true;
|
||||
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
|
||||
rs->iceberg_commit_datas().begin(),
|
||||
rs->iceberg_commit_datas().end());
|
||||
rs_icd.begin(), rs_icd.end());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -481,9 +481,25 @@ public:
|
||||
std::make_move_iterator(tablet_infos.end()));
|
||||
}
|
||||
|
||||
std::vector<THivePartitionUpdate>& hive_partition_updates() { return _hive_partition_updates; }
|
||||
std::vector<THivePartitionUpdate> hive_partition_updates() const {
|
||||
std::lock_guard<std::mutex> lock(_hive_partition_updates_mutex);
|
||||
return _hive_partition_updates;
|
||||
}
|
||||
|
||||
std::vector<TIcebergCommitData>& iceberg_commit_datas() { return _iceberg_commit_datas; }
|
||||
void add_hive_partition_updates(const THivePartitionUpdate& hive_partition_update) {
|
||||
std::lock_guard<std::mutex> lock(_hive_partition_updates_mutex);
|
||||
_hive_partition_updates.emplace_back(hive_partition_update);
|
||||
}
|
||||
|
||||
std::vector<TIcebergCommitData> iceberg_commit_datas() const {
|
||||
std::lock_guard<std::mutex> lock(_iceberg_commit_datas_mutex);
|
||||
return _iceberg_commit_datas;
|
||||
}
|
||||
|
||||
void add_iceberg_commit_datas(const TIcebergCommitData& iceberg_commit_data) {
|
||||
std::lock_guard<std::mutex> lock(_iceberg_commit_datas_mutex);
|
||||
_iceberg_commit_datas.emplace_back(iceberg_commit_data);
|
||||
}
|
||||
|
||||
// local runtime filter mgr, the runtime filter do not have remote target or
|
||||
// not need local merge should regist here. the instance exec finish, the local
|
||||
@ -807,8 +823,10 @@ private:
|
||||
int _task_id = -1;
|
||||
int _task_num = 0;
|
||||
|
||||
mutable std::mutex _hive_partition_updates_mutex;
|
||||
std::vector<THivePartitionUpdate> _hive_partition_updates;
|
||||
|
||||
mutable std::mutex _iceberg_commit_datas_mutex;
|
||||
std::vector<TIcebergCommitData> _iceberg_commit_datas;
|
||||
|
||||
std::vector<std::unique_ptr<doris::pipeline::PipelineXLocalStateBase>> _op_id_to_local_state;
|
||||
|
||||
@ -117,7 +117,8 @@ Status VIcebergPartitionWriter::close(const Status& status) {
|
||||
}
|
||||
}
|
||||
if (status_ok) {
|
||||
_state->iceberg_commit_datas().emplace_back(_build_iceberg_commit_data());
|
||||
auto commit_data = _build_iceberg_commit_data();
|
||||
_state->add_iceberg_commit_datas(commit_data);
|
||||
}
|
||||
return result_status;
|
||||
}
|
||||
|
||||
@ -131,7 +131,8 @@ Status VHivePartitionWriter::close(const Status& status) {
|
||||
}
|
||||
}
|
||||
if (status_ok) {
|
||||
_state->hive_partition_updates().emplace_back(_build_partition_update());
|
||||
auto partition_update = _build_partition_update();
|
||||
_state->add_hive_partition_updates(partition_update);
|
||||
}
|
||||
return result_status;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user