Files
doris/be/src/util/brpc_client_cache.h
zhangdong b129c9901b [improvement](FQDN)Change the implementation of fqdn (#19123)
Main changes:
1. If fqdn is enabled in the configuration file, when fe starts, localAddr will obtain fqdn instead of IP, priority_ Networks will fail
2. The IP and host names of Backend and Front are combined into one field, host. When fqdn is enabled, it represents the host name, and when not enabled, it represents the IP address
3. The communication between clusters directly uses fqdn, and various Connection pool add authentication mechanisms to prevent the IP address of the domain name from changing and the connection between nodes from making errors
4. No longer requires polling to verify if the IP has changed, delete fqdnManager
5. Change the method of verifying the legitimacy of nodes between FEs from obtaining client IP to displaying the identity of the transmitting node itself in the HTTP request header or the message body of the throttle
6. When processing the heartbeat, if BE finds that the host stored by itself is inconsistent with the host stored by the master, after verifying the legitimacy of the host, it will change its own host instead of directly reporting an error
7. Simplify the generation logic of fe name

Scope of influence:
1. Establishing communication connections between clusters
2. Determine whether it is the same node through attributes such as IP
3. Print Log
4. Information display
5. Address Splicing
6. k8s deployment
7. Upgrade compatibility

Test plan:
1. Change the IP address of the node, while keeping the fqdn unchanged, change the IP addresses of fe and be, and verify whether the cluster can read and write data normally
2. Use the master code to generate metadata, and use the previous metadata on the current pr to verify whether it is compatible with the old version (upgrading is no longer supported if fqdn has been enabled before)
3. Deploy fe and be clusters using k8s to verify whether the cluster can read and write data normally
4. According to https://doris.apache.org/zh-CN/docs/dev/admin-manual/cluster-management/fqdn?_highlight=fqdn#%E6%97%A7%E9%9B%86%E7%BE%A4%E5%90%AF%E7%94%A8fqdn Upgrading old clusters
5. Use streamload to specify the fqdn of fe and be to import data separately
6. Use different users to start transactions and write data using insert statements
2023-05-11 00:44:48 +08:00

202 lines
6.9 KiB
C++

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <brpc/adaptive_connection_type.h>
#include <brpc/adaptive_protocol_type.h>
#include <brpc/channel.h>
#include <brpc/controller.h>
#include <butil/endpoint.h>
#include <fmt/format.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/service.h>
#include <parallel_hashmap/phmap.h>
#include <stddef.h>
#include <functional>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "util/network_util.h"
namespace doris {
class PBackendService_Stub;
class PFunctionService_Stub;
} // namespace doris
template <typename T>
using StubMap = phmap::parallel_flat_hash_map<
std::string, std::shared_ptr<T>, std::hash<std::string>, std::equal_to<std::string>,
std::allocator<std::pair<const std::string, std::shared_ptr<T>>>, 8, std::mutex>;
namespace doris {
template <class T>
class BrpcClientCache {
public:
BrpcClientCache();
virtual ~BrpcClientCache();
std::shared_ptr<T> get_client(const butil::EndPoint& endpoint) {
return get_client(butil::endpoint2str(endpoint).c_str());
}
#ifdef BE_TEST
virtual std::shared_ptr<T> get_client(const TNetworkAddress& taddr) {
std::string host_port = fmt::format("{}:{}", taddr.hostname, taddr.port);
return get_client(host_port);
}
#else
std::shared_ptr<T> get_client(const TNetworkAddress& taddr) {
return get_client(taddr.hostname, taddr.port);
}
#endif
std::shared_ptr<T> get_client(const std::string& host, int port) {
std::string realhost;
realhost = host;
if (!is_valid_ip(host)) {
Status status = hostname_to_ip(host, realhost);
if (!status.ok()) {
LOG(WARNING) << "failed to get ip from host:" << status.to_string();
return nullptr;
}
}
std::string host_port = get_host_port(realhost, port);
return get_client(host_port);
}
std::shared_ptr<T> get_client(const std::string& host_port) {
std::shared_ptr<T> stub_ptr;
auto get_value = [&stub_ptr](const auto& v) { stub_ptr = v.second; };
if (LIKELY(_stub_map.if_contains(host_port, get_value))) {
return stub_ptr;
}
// new one stub and insert into map
auto stub = get_new_client_no_cache(host_port);
_stub_map.try_emplace_l(
host_port, [&stub](const auto& v) { stub = v.second; }, stub);
return stub;
}
std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
const std::string& protocol = "baidu_std",
const std::string& connect_type = "") {
brpc::ChannelOptions options;
if constexpr (std::is_same_v<T, PFunctionService_Stub>) {
options.protocol = config::function_service_protocol;
} else {
options.protocol = protocol;
}
if (connect_type != "") {
options.connection_type = connect_type;
}
options.connect_timeout_ms = 2000;
options.max_retry = 10;
std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
int ret_code = 0;
if (host_port.find("://") == std::string::npos) {
ret_code = channel->Init(host_port.c_str(), &options);
} else {
ret_code =
channel->Init(host_port.c_str(), config::rpc_load_balancer.c_str(), &options);
}
if (ret_code) {
return nullptr;
}
return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
}
size_t size() { return _stub_map.size(); }
void clear() { _stub_map.clear(); }
size_t erase(const std::string& host_port) { return _stub_map.erase(host_port); }
size_t erase(const std::string& host, int port) {
std::string host_port = fmt::format("{}:{}", host, port);
return erase(host_port);
}
size_t erase(const butil::EndPoint& endpoint) {
return _stub_map.erase(butil::endpoint2str(endpoint).c_str());
}
bool exist(const std::string& host_port) {
return _stub_map.find(host_port) != _stub_map.end();
}
void get_all(std::vector<std::string>* endpoints) {
for (auto it = _stub_map.begin(); it != _stub_map.end(); ++it) {
endpoints->emplace_back(it->first.c_str());
}
}
bool available(std::shared_ptr<T> stub, const butil::EndPoint& endpoint) {
return available(stub, butil::endpoint2str(endpoint).c_str());
}
bool available(std::shared_ptr<T> stub, const std::string& host_port) {
if (!stub) {
LOG(WARNING) << "stub is null to: " << host_port;
return false;
}
std::string message = "hello doris!";
PHandShakeRequest request;
request.set_hello(message);
PHandShakeResponse response;
brpc::Controller cntl;
stub->hand_shake(&cntl, &request, &response, nullptr);
if (cntl.Failed()) {
LOG(WARNING) << "open brpc connection to " << host_port
<< " failed: " << cntl.ErrorText();
return false;
} else if (response.has_status() && response.has_hello() && response.hello() == message &&
response.status().status_code() == 0) {
return true;
} else {
LOG(WARNING) << "open brpc connection to " << host_port
<< " failed: " << response.DebugString();
return false;
}
}
bool available(std::shared_ptr<T> stub, const std::string& host, int port) {
std::string host_port = fmt::format("{}:{}", host, port);
return available(stub, host_port);
}
private:
StubMap<T> _stub_map;
};
using InternalServiceClientCache = BrpcClientCache<PBackendService_Stub>;
using FunctionServiceClientCache = BrpcClientCache<PFunctionService_Stub>;
} // namespace doris