[feature](binlog) Add ingest_binlog/http_get_snapshot limit download speed && Add async ingest_binlog (#26323)
This commit is contained in:
@ -1121,6 +1121,12 @@ DEFINE_String(default_tzfiles_path, "${DORIS_HOME}/zoneinfo");
|
||||
// Max size(bytes) of group commit queues, used for mem back pressure.
|
||||
DEFINE_Int32(group_commit_max_queue_size, "65536");
|
||||
|
||||
// Ingest binlog work pool size, -1 is disable, 0 is hardware concurrency
|
||||
DEFINE_Int32(ingest_binlog_work_pool_size, "-1");
|
||||
|
||||
// Download binlog rate limit, unit is KB/s, 0 means no limit
|
||||
DEFINE_Int32(download_binlog_rate_limit_kbs, "0");
|
||||
|
||||
// clang-format off
|
||||
#ifdef BE_TEST
|
||||
// test s3
|
||||
|
||||
@ -1191,6 +1191,12 @@ DECLARE_String(default_tzfiles_path);
|
||||
// Max size(bytes) of group commit queues, used for mem back pressure.
|
||||
DECLARE_Int32(group_commit_max_queue_size);
|
||||
|
||||
// Ingest binlog work pool size
|
||||
DECLARE_Int32(ingest_binlog_work_pool_size);
|
||||
|
||||
// Download binlog rate limit, unit is KB/s
|
||||
DECLARE_Int32(download_binlog_rate_limit_kbs);
|
||||
|
||||
#ifdef BE_TEST
|
||||
// test s3
|
||||
DECLARE_String(test_s3_resource);
|
||||
|
||||
@ -33,13 +33,20 @@
|
||||
#include "runtime/exec_env.h"
|
||||
|
||||
namespace doris {
|
||||
namespace {
|
||||
static const std::string FILE_PARAMETER = "file";
|
||||
static const std::string TOKEN_PARAMETER = "token";
|
||||
static const std::string CHANNEL_PARAMETER = "channel";
|
||||
static const std::string CHANNEL_INGEST_BINLOG_TYPE = "ingest_binlog";
|
||||
} // namespace
|
||||
|
||||
const std::string FILE_PARAMETER = "file";
|
||||
const std::string TOKEN_PARAMETER = "token";
|
||||
|
||||
DownloadAction::DownloadAction(ExecEnv* exec_env, const std::vector<std::string>& allow_dirs,
|
||||
int32_t num_workers)
|
||||
: _exec_env(exec_env), _download_type(NORMAL), _num_workers(num_workers) {
|
||||
DownloadAction::DownloadAction(ExecEnv* exec_env,
|
||||
std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group,
|
||||
const std::vector<std::string>& allow_dirs, int32_t num_workers)
|
||||
: _exec_env(exec_env),
|
||||
_download_type(NORMAL),
|
||||
_num_workers(num_workers),
|
||||
_rate_limit_group(std::move(rate_limit_group)) {
|
||||
for (auto& dir : allow_dirs) {
|
||||
std::string p;
|
||||
Status st = io::global_local_filesystem()->canonicalize(dir, &p);
|
||||
@ -107,7 +114,13 @@ void DownloadAction::handle_normal(HttpRequest* req, const std::string& file_par
|
||||
if (is_dir) {
|
||||
do_dir_response(file_param, req);
|
||||
} else {
|
||||
do_file_response(file_param, req);
|
||||
const auto& channel = req->param(CHANNEL_PARAMETER);
|
||||
bool ingest_binlog = (channel == CHANNEL_INGEST_BINLOG_TYPE);
|
||||
if (ingest_binlog) {
|
||||
do_file_response(file_param, req, _rate_limit_group.get());
|
||||
} else {
|
||||
do_file_response(file_param, req);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -24,6 +24,8 @@
|
||||
#include "http/http_handler.h"
|
||||
#include "util/threadpool.h"
|
||||
|
||||
struct bufferevent_rate_limit_group;
|
||||
|
||||
namespace doris {
|
||||
|
||||
class ExecEnv;
|
||||
@ -36,8 +38,9 @@ class HttpRequest;
|
||||
// We use parameter named 'file' to specify the static resource path, it is an absolute path.
|
||||
class DownloadAction : public HttpHandler {
|
||||
public:
|
||||
DownloadAction(ExecEnv* exec_env, const std::vector<std::string>& allow_dirs,
|
||||
int32_t num_workers = 0);
|
||||
DownloadAction(ExecEnv* exec_env,
|
||||
std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group,
|
||||
const std::vector<std::string>& allow_dirs, int32_t num_workers = 0);
|
||||
|
||||
// for load error
|
||||
DownloadAction(ExecEnv* exec_env, const std::string& error_log_root_dir);
|
||||
@ -67,6 +70,8 @@ private:
|
||||
std::string _error_log_root_dir;
|
||||
int32_t _num_workers;
|
||||
std::unique_ptr<ThreadPool> _download_workers;
|
||||
|
||||
std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
|
||||
}; // end class DownloadAction
|
||||
|
||||
} // end namespace doris
|
||||
|
||||
@ -21,8 +21,10 @@
|
||||
#include <fmt/ranges.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
#include <stdexcept>
|
||||
#include <string_view>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "common/config.h"
|
||||
@ -96,7 +98,7 @@ void handle_get_binlog_info(HttpRequest* req) {
|
||||
}
|
||||
|
||||
/// handle get segment file, need tablet_id, rowset_id && index
|
||||
void handle_get_segment_file(HttpRequest* req) {
|
||||
void handle_get_segment_file(HttpRequest* req, bufferevent_rate_limit_group* rate_limit_group) {
|
||||
// Step 1: get download file path
|
||||
std::string segment_file_path;
|
||||
try {
|
||||
@ -125,7 +127,7 @@ void handle_get_segment_file(HttpRequest* req) {
|
||||
LOG(WARNING) << "file not exist, file path: " << segment_file_path;
|
||||
return;
|
||||
}
|
||||
do_file_response(segment_file_path, req);
|
||||
do_file_response(segment_file_path, req, rate_limit_group);
|
||||
}
|
||||
|
||||
void handle_get_rowset_meta(HttpRequest* req) {
|
||||
@ -149,7 +151,9 @@ void handle_get_rowset_meta(HttpRequest* req) {
|
||||
|
||||
} // namespace
|
||||
|
||||
DownloadBinlogAction::DownloadBinlogAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
|
||||
DownloadBinlogAction::DownloadBinlogAction(
|
||||
ExecEnv* exec_env, std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group)
|
||||
: _exec_env(exec_env), _rate_limit_group(std::move(rate_limit_group)) {}
|
||||
|
||||
void DownloadBinlogAction::handle(HttpRequest* req) {
|
||||
VLOG_CRITICAL << "accept one download binlog request " << req->debug_string();
|
||||
@ -178,7 +182,7 @@ void DownloadBinlogAction::handle(HttpRequest* req) {
|
||||
if (method == "get_binlog_info") {
|
||||
handle_get_binlog_info(req);
|
||||
} else if (method == "get_segment_file") {
|
||||
handle_get_segment_file(req);
|
||||
handle_get_segment_file(req, _rate_limit_group.get());
|
||||
} else if (method == "get_rowset_meta") {
|
||||
handle_get_rowset_meta(req);
|
||||
} else {
|
||||
|
||||
@ -17,12 +17,15 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "http/http_handler.h"
|
||||
|
||||
struct bufferevent_rate_limit_group;
|
||||
|
||||
namespace doris {
|
||||
|
||||
class ExecEnv;
|
||||
@ -30,7 +33,8 @@ class HttpRequest;
|
||||
|
||||
class DownloadBinlogAction : public HttpHandler {
|
||||
public:
|
||||
DownloadBinlogAction(ExecEnv* exec_env);
|
||||
DownloadBinlogAction(ExecEnv* exec_env,
|
||||
std::shared_ptr<bufferevent_rate_limit_group> rate_limit_group);
|
||||
virtual ~DownloadBinlogAction() = default;
|
||||
|
||||
void handle(HttpRequest* req) override;
|
||||
@ -40,6 +44,7 @@ private:
|
||||
|
||||
private:
|
||||
ExecEnv* _exec_env;
|
||||
std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -84,7 +84,17 @@ static int on_connection(struct evhttp_request* req, void* param) {
|
||||
EvHttpServer::EvHttpServer(int port, int num_workers)
|
||||
: _port(port), _num_workers(num_workers), _real_port(0) {
|
||||
_host = BackendOptions::get_service_bind_address();
|
||||
|
||||
evthread_use_pthreads();
|
||||
DCHECK_GT(_num_workers, 0);
|
||||
_event_bases.resize(_num_workers);
|
||||
for (int i = 0; i < _num_workers; ++i) {
|
||||
std::shared_ptr<event_base> base(event_base_new(),
|
||||
[](event_base* base) { event_base_free(base); });
|
||||
CHECK(base != nullptr) << "Couldn't create an event_base.";
|
||||
std::lock_guard lock(_event_bases_lock);
|
||||
_event_bases[i] = base;
|
||||
}
|
||||
}
|
||||
|
||||
EvHttpServer::EvHttpServer(const std::string& host, int port, int num_workers)
|
||||
@ -107,34 +117,28 @@ void EvHttpServer::start() {
|
||||
.set_min_threads(_num_workers)
|
||||
.set_max_threads(_num_workers)
|
||||
.build(&_workers));
|
||||
|
||||
evthread_use_pthreads();
|
||||
_event_bases.resize(_num_workers);
|
||||
for (int i = 0; i < _num_workers; ++i) {
|
||||
CHECK(_workers->submit_func([this, i]() {
|
||||
std::shared_ptr<event_base> base(event_base_new(), [](event_base* base) {
|
||||
event_base_free(base);
|
||||
});
|
||||
CHECK(base != nullptr) << "Couldn't create an event_base.";
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(_event_bases_lock);
|
||||
_event_bases[i] = base;
|
||||
}
|
||||
auto status = _workers->submit_func([this, i]() {
|
||||
std::shared_ptr<event_base> base;
|
||||
{
|
||||
std::lock_guard lock(_event_bases_lock);
|
||||
base = _event_bases[i];
|
||||
}
|
||||
|
||||
/* Create a new evhttp object to handle requests. */
|
||||
std::shared_ptr<evhttp> http(evhttp_new(base.get()),
|
||||
[](evhttp* http) { evhttp_free(http); });
|
||||
CHECK(http != nullptr) << "Couldn't create an evhttp.";
|
||||
/* Create a new evhttp object to handle requests. */
|
||||
std::shared_ptr<evhttp> http(evhttp_new(base.get()),
|
||||
[](evhttp* http) { evhttp_free(http); });
|
||||
CHECK(http != nullptr) << "Couldn't create an evhttp.";
|
||||
|
||||
auto res = evhttp_accept_socket(http.get(), _server_fd);
|
||||
CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
|
||||
auto res = evhttp_accept_socket(http.get(), _server_fd);
|
||||
CHECK(res >= 0) << "evhttp accept socket failed, res=" << res;
|
||||
|
||||
evhttp_set_newreqcb(http.get(), on_connection, this);
|
||||
evhttp_set_gencb(http.get(), on_request, this);
|
||||
evhttp_set_newreqcb(http.get(), on_connection, this);
|
||||
evhttp_set_gencb(http.get(), on_request, this);
|
||||
|
||||
event_base_dispatch(base.get());
|
||||
})
|
||||
.ok());
|
||||
event_base_dispatch(base.get());
|
||||
});
|
||||
CHECK(status.ok());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -55,6 +55,11 @@ public:
|
||||
// get real port
|
||||
int get_real_port() const { return _real_port; }
|
||||
|
||||
std::vector<std::shared_ptr<event_base>> get_event_bases() {
|
||||
std::lock_guard lock(_event_bases_lock);
|
||||
return _event_bases;
|
||||
}
|
||||
|
||||
private:
|
||||
Status _bind();
|
||||
HttpHandler* _find_handler(HttpRequest* req);
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
#include "http/http_channel.h"
|
||||
|
||||
#include <event2/buffer.h>
|
||||
#include <event2/bufferevent.h>
|
||||
#include <event2/http.h>
|
||||
|
||||
#include <algorithm>
|
||||
@ -69,11 +70,17 @@ void HttpChannel::send_reply(HttpRequest* request, HttpStatus status, const std:
|
||||
evbuffer_free(evb);
|
||||
}
|
||||
|
||||
void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size) {
|
||||
void HttpChannel::send_file(HttpRequest* request, int fd, size_t off, size_t size,
|
||||
bufferevent_rate_limit_group* rate_limit_group) {
|
||||
auto evb = evbuffer_new();
|
||||
evbuffer_add_file(evb, fd, off, size);
|
||||
evhttp_send_reply(request->get_evhttp_request(), HttpStatus::OK,
|
||||
default_reason(HttpStatus::OK).c_str(), evb);
|
||||
auto* evhttp_request = request->get_evhttp_request();
|
||||
if (rate_limit_group) {
|
||||
auto* evhttp_connection = evhttp_request_get_connection(evhttp_request);
|
||||
auto* buffer_event = evhttp_connection_get_bufferevent(evhttp_connection);
|
||||
bufferevent_add_to_rate_limit_group(buffer_event, rate_limit_group);
|
||||
}
|
||||
evhttp_send_reply(evhttp_request, HttpStatus::OK, default_reason(HttpStatus::OK).c_str(), evb);
|
||||
evbuffer_free(evb);
|
||||
}
|
||||
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
|
||||
#include "http/http_status.h"
|
||||
|
||||
struct bufferevent_rate_limit_group;
|
||||
namespace doris {
|
||||
|
||||
class HttpRequest;
|
||||
@ -43,7 +44,8 @@ public:
|
||||
|
||||
static void send_reply(HttpRequest* request, HttpStatus status, const std::string& content);
|
||||
|
||||
static void send_file(HttpRequest* request, int fd, size_t off, size_t size);
|
||||
static void send_file(HttpRequest* request, int fd, size_t off, size_t size,
|
||||
bufferevent_rate_limit_group* rate_limit_group = nullptr);
|
||||
|
||||
static bool compress_content(const std::string& accept_encoding, const std::string& input,
|
||||
std::string* output);
|
||||
|
||||
@ -124,7 +124,8 @@ std::string get_content_type(const std::string& file_name) {
|
||||
return "";
|
||||
}
|
||||
|
||||
void do_file_response(const std::string& file_path, HttpRequest* req) {
|
||||
void do_file_response(const std::string& file_path, HttpRequest* req,
|
||||
bufferevent_rate_limit_group* rate_limit_group) {
|
||||
if (file_path.find("..") != std::string::npos) {
|
||||
LOG(WARNING) << "Not allowed to read relative path: " << file_path;
|
||||
HttpChannel::send_error(req, HttpStatus::FORBIDDEN);
|
||||
@ -165,7 +166,7 @@ void do_file_response(const std::string& file_path, HttpRequest* req) {
|
||||
return;
|
||||
}
|
||||
|
||||
HttpChannel::send_file(req, fd, 0, file_size);
|
||||
HttpChannel::send_file(req, fd, 0, file_size, rate_limit_group);
|
||||
}
|
||||
|
||||
void do_dir_response(const std::string& dir_path, HttpRequest* req) {
|
||||
|
||||
@ -22,6 +22,8 @@
|
||||
#include "common/utils.h"
|
||||
#include "http/http_request.h"
|
||||
|
||||
struct bufferevent_rate_limit_group;
|
||||
|
||||
namespace doris {
|
||||
|
||||
struct AuthInfo;
|
||||
@ -34,7 +36,8 @@ bool parse_basic_auth(const HttpRequest& req, std::string* user, std::string* pa
|
||||
|
||||
bool parse_basic_auth(const HttpRequest& req, AuthInfo* auth);
|
||||
|
||||
void do_file_response(const std::string& dir_path, HttpRequest* req);
|
||||
void do_file_response(const std::string& dir_path, HttpRequest* req,
|
||||
bufferevent_rate_limit_group* rate_limit_group = nullptr);
|
||||
|
||||
void do_dir_response(const std::string& dir_path, HttpRequest* req);
|
||||
|
||||
|
||||
@ -140,6 +140,7 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac
|
||||
// case 1: user start a new txn, rowset = null
|
||||
// case 2: loading txn from meta env
|
||||
TabletTxnInfo load_info(load_id, nullptr, ingest);
|
||||
load_info.prepare();
|
||||
txn_tablet_map[key][tablet_info] = load_info;
|
||||
_insert_txn_partition_map_unlocked(transaction_id, partition_id);
|
||||
VLOG_NOTICE << "add transaction to engine successfully."
|
||||
@ -162,6 +163,29 @@ Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr&
|
||||
tablet->tablet_id(), tablet->tablet_uid(), version, stats);
|
||||
}
|
||||
|
||||
void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id,
|
||||
TTabletId tablet_id, TabletUid tablet_uid) {
|
||||
pair<int64_t, int64_t> key(partition_id, transaction_id);
|
||||
TabletInfo tablet_info(tablet_id, tablet_uid);
|
||||
|
||||
std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
|
||||
|
||||
auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
|
||||
auto it = txn_tablet_map.find(key);
|
||||
if (it == txn_tablet_map.end()) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto& tablet_txn_info_map = it->second;
|
||||
if (auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
|
||||
tablet_txn_info_iter == tablet_txn_info_map.end()) {
|
||||
return;
|
||||
} else {
|
||||
auto& txn_info = tablet_txn_info_iter->second;
|
||||
txn_info.abort();
|
||||
}
|
||||
}
|
||||
|
||||
// delete the txn from manager if it is not committed(not have a valid rowset)
|
||||
Status TxnManager::rollback_txn(TPartitionId partition_id, const Tablet& tablet,
|
||||
TTransactionId transaction_id) {
|
||||
@ -217,6 +241,7 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
<< " partition_id=" << partition_id << " transaction_id=" << transaction_id
|
||||
<< " tablet_id=" << tablet_id;
|
||||
}
|
||||
|
||||
pair<int64_t, int64_t> key(partition_id, transaction_id);
|
||||
TabletInfo tablet_info(tablet_id, tablet_uid);
|
||||
if (rowset_ptr == nullptr) {
|
||||
@ -252,28 +277,30 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
// case 1: user commit rowset, then the load id must be equal
|
||||
TabletTxnInfo& load_info = load_itr->second;
|
||||
// check if load id is equal
|
||||
if (load_info.load_id.hi() == load_id.hi() && load_info.load_id.lo() == load_id.lo() &&
|
||||
load_info.rowset != nullptr &&
|
||||
load_info.rowset->rowset_id() == rowset_ptr->rowset_id()) {
|
||||
// find a rowset with same rowset id, then it means a duplicate call
|
||||
if (load_info.rowset == nullptr) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (load_info.load_id.hi() != load_id.hi() || load_info.load_id.lo() != load_id.lo()) {
|
||||
break;
|
||||
}
|
||||
|
||||
// find a rowset with same rowset id, then it means a duplicate call
|
||||
if (load_info.rowset->rowset_id() == rowset_ptr->rowset_id()) {
|
||||
LOG(INFO) << "find rowset exists when commit transaction to engine."
|
||||
<< "partition_id: " << key.first << ", transaction_id: " << key.second
|
||||
<< ", tablet: " << tablet_info.to_string()
|
||||
<< ", rowset_id: " << load_info.rowset->rowset_id();
|
||||
return Status::OK();
|
||||
} else if (load_info.load_id.hi() == load_id.hi() &&
|
||||
load_info.load_id.lo() == load_id.lo() && load_info.rowset != nullptr &&
|
||||
load_info.rowset->rowset_id() != rowset_ptr->rowset_id()) {
|
||||
// find a rowset with different rowset id, then it should not happen, just return errors
|
||||
return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>(
|
||||
"find rowset exists when commit transaction to engine. but rowset ids are not "
|
||||
"same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, "
|
||||
"new rowset_id: {}",
|
||||
key.first, key.second, tablet_info.to_string(),
|
||||
load_info.rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
// find a rowset with different rowset id, then it should not happen, just return errors
|
||||
return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>(
|
||||
"find rowset exists when commit transaction to engine. but rowset ids are not "
|
||||
"same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, new "
|
||||
"rowset_id: {}",
|
||||
key.first, key.second, tablet_info.to_string(),
|
||||
load_info.rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string());
|
||||
} while (false);
|
||||
|
||||
// if not in recovery mode, then should persist the meta to meta env
|
||||
@ -301,6 +328,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id,
|
||||
load_info.delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id()));
|
||||
}
|
||||
}
|
||||
load_info.commit();
|
||||
|
||||
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
|
||||
txn_tablet_map[key][tablet_info] = load_info;
|
||||
_insert_txn_partition_map_unlocked(transaction_id, partition_id);
|
||||
@ -453,30 +482,36 @@ Status TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId transa
|
||||
TTabletId tablet_id, TabletUid tablet_uid) {
|
||||
pair<int64_t, int64_t> key(partition_id, transaction_id);
|
||||
TabletInfo tablet_info(tablet_id, tablet_uid);
|
||||
|
||||
std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id));
|
||||
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
|
||||
|
||||
auto it = txn_tablet_map.find(key);
|
||||
if (it != txn_tablet_map.end()) {
|
||||
auto load_itr = it->second.find(tablet_info);
|
||||
if (load_itr != it->second.end()) {
|
||||
// found load for txn,tablet
|
||||
// case 1: user commit rowset, then the load id must be equal
|
||||
TabletTxnInfo& load_info = load_itr->second;
|
||||
if (load_info.rowset != nullptr) {
|
||||
return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
|
||||
"if rowset is not null, it means other thread may commit the rowset should "
|
||||
"not delete txn any more");
|
||||
}
|
||||
}
|
||||
it->second.erase(tablet_info);
|
||||
LOG(INFO) << "rollback transaction from engine successfully."
|
||||
<< " partition_id: " << key.first << ", transaction_id: " << key.second
|
||||
<< ", tablet: " << tablet_info.to_string();
|
||||
if (it->second.empty()) {
|
||||
txn_tablet_map.erase(it);
|
||||
_clear_txn_partition_map_unlocked(transaction_id, partition_id);
|
||||
if (it == txn_tablet_map.end()) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
auto& tablet_txn_info_map = it->second;
|
||||
if (auto load_itr = tablet_txn_info_map.find(tablet_info);
|
||||
load_itr != tablet_txn_info_map.end()) {
|
||||
// found load for txn,tablet
|
||||
// case 1: user commit rowset, then the load id must be equal
|
||||
TabletTxnInfo& load_info = load_itr->second;
|
||||
if (load_info.rowset != nullptr) {
|
||||
return Status::Error<TRANSACTION_ALREADY_COMMITTED>(
|
||||
"if rowset is not null, it means other thread may commit the rowset should "
|
||||
"not delete txn any more");
|
||||
}
|
||||
}
|
||||
|
||||
tablet_txn_info_map.erase(tablet_info);
|
||||
LOG(INFO) << "rollback transaction from engine successfully."
|
||||
<< " partition_id: " << key.first << ", transaction_id: " << key.second
|
||||
<< ", tablet: " << tablet_info.to_string();
|
||||
if (tablet_txn_info_map.empty()) {
|
||||
txn_tablet_map.erase(it);
|
||||
_clear_txn_partition_map_unlocked(transaction_id, partition_id);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -650,18 +685,6 @@ void TxnManager::get_all_commit_tablet_txn_info_by_tablet(
|
||||
}
|
||||
}
|
||||
|
||||
bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_id,
|
||||
TTabletId tablet_id, TabletUid tablet_uid) {
|
||||
pair<int64_t, int64_t> key(partition_id, transaction_id);
|
||||
TabletInfo tablet_info(tablet_id, tablet_uid);
|
||||
std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
|
||||
txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id);
|
||||
auto it = txn_tablet_map.find(key);
|
||||
bool found = it != txn_tablet_map.end() && it->second.find(tablet_info) != it->second.end();
|
||||
|
||||
return found;
|
||||
}
|
||||
|
||||
void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) {
|
||||
int64_t now = UnixSeconds();
|
||||
// traverse the txn map, and get all expired txns
|
||||
@ -671,13 +694,15 @@ void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>
|
||||
auto txn_id = it.first.second;
|
||||
for (auto& t_map : it.second) {
|
||||
double diff = difftime(now, t_map.second.creation_time);
|
||||
if (diff >= config::pending_data_expire_time_sec) {
|
||||
(*expire_txn_map)[t_map.first].push_back(txn_id);
|
||||
if (VLOG_IS_ON(3)) {
|
||||
VLOG_NOTICE << "find expired txn."
|
||||
<< " tablet=" << t_map.first.to_string()
|
||||
<< " transaction_id=" << txn_id << " exist_sec=" << diff;
|
||||
}
|
||||
if (diff < config::pending_data_expire_time_sec) {
|
||||
continue;
|
||||
}
|
||||
|
||||
(*expire_txn_map)[t_map.first].push_back(txn_id);
|
||||
if (VLOG_IS_ON(3)) {
|
||||
VLOG_NOTICE << "find expired txn."
|
||||
<< " tablet=" << t_map.first.to_string()
|
||||
<< " transaction_id=" << txn_id << " exist_sec=" << diff;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -796,4 +821,27 @@ void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, i
|
||||
_tablet_version_cache->release(handle);
|
||||
}
|
||||
|
||||
TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id,
|
||||
TTabletId tablet_id, TabletUid tablet_uid) {
|
||||
pair<int64_t, int64_t> key(partition_id, transaction_id);
|
||||
TabletInfo tablet_info(tablet_id, tablet_uid);
|
||||
|
||||
std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
|
||||
|
||||
auto& txn_tablet_map = _get_txn_tablet_map(transaction_id);
|
||||
auto it = txn_tablet_map.find(key);
|
||||
if (it == txn_tablet_map.end()) {
|
||||
return TxnState::NOT_FOUND;
|
||||
}
|
||||
|
||||
auto& tablet_txn_info_map = it->second;
|
||||
if (auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info);
|
||||
tablet_txn_info_iter == tablet_txn_info_map.end()) {
|
||||
return TxnState::NOT_FOUND;
|
||||
} else {
|
||||
const auto& txn_info = tablet_txn_info_iter->second;
|
||||
return txn_info.state;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -51,6 +51,15 @@ class DeltaWriter;
|
||||
class OlapMeta;
|
||||
struct TabletPublishStatistics;
|
||||
|
||||
enum class TxnState {
|
||||
NOT_FOUND = 0,
|
||||
PREPARED = 1,
|
||||
COMMITTED = 2,
|
||||
ROLLEDBACK = 3,
|
||||
ABORTED = 4,
|
||||
DELETED = 5,
|
||||
};
|
||||
|
||||
struct TabletTxnInfo {
|
||||
PUniqueId load_id;
|
||||
RowsetSharedPtr rowset;
|
||||
@ -61,6 +70,9 @@ struct TabletTxnInfo {
|
||||
int64_t creation_time;
|
||||
bool ingest {false};
|
||||
std::shared_ptr<PartialUpdateInfo> partial_update_info;
|
||||
TxnState state {TxnState::PREPARED};
|
||||
|
||||
TabletTxnInfo() = default;
|
||||
|
||||
TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
|
||||
: load_id(load_id), rowset(rowset), creation_time(UnixSeconds()) {}
|
||||
@ -77,7 +89,14 @@ struct TabletTxnInfo {
|
||||
rowset_ids(ids),
|
||||
creation_time(UnixSeconds()) {}
|
||||
|
||||
TabletTxnInfo() {}
|
||||
void prepare() { state = TxnState::PREPARED; }
|
||||
void commit() { state = TxnState::COMMITTED; }
|
||||
void rollback() { state = TxnState::ROLLEDBACK; }
|
||||
void abort() {
|
||||
if (state == TxnState::PREPARED) {
|
||||
state = TxnState::ABORTED;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
struct CommitTabletTxnInfo {
|
||||
@ -145,6 +164,10 @@ public:
|
||||
TTabletId tablet_id, TabletUid tablet_uid, const Version& version,
|
||||
TabletPublishStatistics* stats);
|
||||
|
||||
// only abort not committed txn
|
||||
void abort_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
|
||||
TabletUid tablet_uid);
|
||||
|
||||
// delete the txn from manager if it is not committed(not have a valid rowset)
|
||||
Status rollback_txn(TPartitionId partition_id, TTransactionId transaction_id,
|
||||
TTabletId tablet_id, TabletUid tablet_uid);
|
||||
@ -163,10 +186,6 @@ public:
|
||||
|
||||
void get_all_related_tablets(std::set<TabletInfo>* tablet_infos);
|
||||
|
||||
// Just check if the txn exists.
|
||||
bool has_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
|
||||
TabletUid tablet_uid);
|
||||
|
||||
// Get all expired txns and save them in expire_txn_map.
|
||||
// This is currently called before reporting all tablet info, to avoid iterating txn map for every tablets.
|
||||
void build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map);
|
||||
@ -195,6 +214,9 @@ public:
|
||||
int64_t get_txn_by_tablet_version(int64_t tablet_id, int64_t version);
|
||||
void update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id);
|
||||
|
||||
TxnState get_txn_state(TPartitionId partition_id, TTransactionId transaction_id,
|
||||
TTabletId tablet_id, TabletUid tablet_uid);
|
||||
|
||||
private:
|
||||
using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id;
|
||||
|
||||
|
||||
@ -467,7 +467,7 @@ Status SnapshotLoader::remote_http_download(
|
||||
|
||||
for (const auto& filename : filename_list) {
|
||||
std::string remote_file_url = fmt::format(
|
||||
"http://{}:{}/api/_tablet/_download?token={}&file={}/{}",
|
||||
"http://{}:{}/api/_tablet/_download?token={}&file={}/{}&channel=ingest_binlog",
|
||||
remote_tablet_snapshot.remote_be_addr.hostname,
|
||||
remote_tablet_snapshot.remote_be_addr.port, remote_tablet_snapshot.remote_token,
|
||||
remote_tablet_snapshot.remote_snapshot_path, filename);
|
||||
|
||||
@ -36,6 +36,7 @@
|
||||
#include <memory>
|
||||
#include <ostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
@ -63,6 +64,7 @@
|
||||
#include "runtime/stream_load/stream_load_recorder.h"
|
||||
#include "util/arrow/row_batch.h"
|
||||
#include "util/defer_op.h"
|
||||
#include "util/threadpool.h"
|
||||
#include "util/thrift_server.h"
|
||||
#include "util/uid_util.h"
|
||||
|
||||
@ -79,6 +81,283 @@ class TTransportException;
|
||||
|
||||
namespace doris {
|
||||
|
||||
namespace {
|
||||
constexpr uint64_t kMaxTimeoutMs = 3000; // 3s
|
||||
struct IngestBinlogArg {
|
||||
int64_t txn_id;
|
||||
int64_t partition_id;
|
||||
int64_t local_tablet_id;
|
||||
TabletSharedPtr local_tablet;
|
||||
TIngestBinlogRequest request;
|
||||
TStatus* tstatus;
|
||||
};
|
||||
|
||||
void _ingest_binlog(IngestBinlogArg* arg) {
|
||||
auto txn_id = arg->txn_id;
|
||||
auto partition_id = arg->partition_id;
|
||||
auto local_tablet_id = arg->local_tablet_id;
|
||||
const auto& local_tablet = arg->local_tablet;
|
||||
const auto& local_tablet_uid = local_tablet->tablet_uid();
|
||||
|
||||
auto& request = arg->request;
|
||||
|
||||
TStatus tstatus;
|
||||
Defer defer {[=, &tstatus, ingest_binlog_tstatus = arg->tstatus]() {
|
||||
LOG(INFO) << "ingest binlog. result: " << apache::thrift::ThriftDebugString(tstatus);
|
||||
if (tstatus.status_code != TStatusCode::OK) {
|
||||
// abort txn
|
||||
StorageEngine::instance()->txn_manager()->abort_txn(partition_id, txn_id,
|
||||
local_tablet_id, local_tablet_uid);
|
||||
}
|
||||
|
||||
if (ingest_binlog_tstatus) {
|
||||
*ingest_binlog_tstatus = std::move(tstatus);
|
||||
}
|
||||
}};
|
||||
|
||||
auto set_tstatus = [&tstatus](TStatusCode::type code, std::string error_msg) {
|
||||
tstatus.__set_status_code(code);
|
||||
tstatus.__isset.error_msgs = true;
|
||||
tstatus.error_msgs.push_back(std::move(error_msg));
|
||||
};
|
||||
|
||||
// Step 3: get binlog info
|
||||
auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download", request.remote_host,
|
||||
request.remote_port);
|
||||
constexpr int max_retry = 3;
|
||||
|
||||
auto get_binlog_info_url =
|
||||
fmt::format("{}?method={}&tablet_id={}&binlog_version={}", binlog_api_url,
|
||||
"get_binlog_info", request.remote_tablet_id, request.binlog_version);
|
||||
std::string binlog_info;
|
||||
auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(get_binlog_info_url));
|
||||
client->set_timeout_ms(kMaxTimeoutMs);
|
||||
return client->execute(&binlog_info);
|
||||
};
|
||||
auto status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
|
||||
std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, ":");
|
||||
// TODO(Drogon): check binlog info content is right
|
||||
DCHECK(binlog_info_parts.size() == 2);
|
||||
const std::string& remote_rowset_id = binlog_info_parts[0];
|
||||
int64_t num_segments = std::stoll(binlog_info_parts[1]);
|
||||
|
||||
// Step 4: get rowset meta
|
||||
auto get_rowset_meta_url = fmt::format(
|
||||
"{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", binlog_api_url,
|
||||
"get_rowset_meta", request.remote_tablet_id, remote_rowset_id, request.binlog_version);
|
||||
std::string rowset_meta_str;
|
||||
auto get_rowset_meta_cb = [&get_rowset_meta_url, &rowset_meta_str](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(get_rowset_meta_url));
|
||||
client->set_timeout_ms(kMaxTimeoutMs);
|
||||
return client->execute(&rowset_meta_str);
|
||||
};
|
||||
status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to get rowset meta from " << get_rowset_meta_url
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
RowsetMetaPB rowset_meta_pb;
|
||||
if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
|
||||
LOG(WARNING) << "failed to parse rowset meta from " << get_rowset_meta_url;
|
||||
status = Status::InternalError("failed to parse rowset meta");
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
// rewrite rowset meta
|
||||
rowset_meta_pb.set_tablet_id(local_tablet_id);
|
||||
rowset_meta_pb.set_partition_id(partition_id);
|
||||
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
|
||||
rowset_meta_pb.set_txn_id(txn_id);
|
||||
rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
|
||||
auto rowset_meta = std::make_shared<RowsetMeta>();
|
||||
if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
|
||||
LOG(WARNING) << "failed to init rowset meta from " << get_rowset_meta_url;
|
||||
status = Status::InternalError("failed to init rowset meta");
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
|
||||
rowset_meta->set_rowset_id(new_rowset_id);
|
||||
rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
|
||||
|
||||
// Step 5: get all segment files
|
||||
// Step 5.1: get all segment files size
|
||||
std::vector<std::string> segment_file_urls;
|
||||
segment_file_urls.reserve(num_segments);
|
||||
std::vector<uint64_t> segment_file_sizes;
|
||||
segment_file_sizes.reserve(num_segments);
|
||||
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
|
||||
auto get_segment_file_size_url = fmt::format(
|
||||
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", binlog_api_url,
|
||||
"get_segment_file", request.remote_tablet_id, remote_rowset_id, segment_index);
|
||||
uint64_t segment_file_size;
|
||||
auto get_segment_file_size_cb = [&get_segment_file_size_url,
|
||||
&segment_file_size](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(get_segment_file_size_url));
|
||||
client->set_timeout_ms(kMaxTimeoutMs);
|
||||
RETURN_IF_ERROR(client->head());
|
||||
return client->get_content_length(&segment_file_size);
|
||||
};
|
||||
|
||||
status = HttpClient::execute_with_retry(max_retry, 1, get_segment_file_size_cb);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to get segment file size from " << get_segment_file_size_url
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
|
||||
segment_file_sizes.push_back(segment_file_size);
|
||||
segment_file_urls.push_back(std::move(get_segment_file_size_url));
|
||||
}
|
||||
|
||||
// Step 5.2: check data capacity
|
||||
uint64_t total_size = std::accumulate(segment_file_sizes.begin(), segment_file_sizes.end(), 0);
|
||||
if (!local_tablet->can_add_binlog(total_size)) {
|
||||
LOG(WARNING) << "failed to add binlog, no enough space, total_size=" << total_size
|
||||
<< ", tablet=" << local_tablet->tablet_id();
|
||||
status = Status::InternalError("no enough space");
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 5.3: get all segment files
|
||||
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
|
||||
auto segment_file_size = segment_file_sizes[segment_index];
|
||||
auto get_segment_file_url = segment_file_urls[segment_index];
|
||||
|
||||
uint64_t estimate_timeout =
|
||||
segment_file_size / config::download_low_speed_limit_kbps / 1024;
|
||||
if (estimate_timeout < config::download_low_speed_time) {
|
||||
estimate_timeout = config::download_low_speed_time;
|
||||
}
|
||||
|
||||
auto local_segment_path = BetaRowset::segment_file_path(
|
||||
local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index);
|
||||
LOG(INFO) << fmt::format("download segment file from {} to {}", get_segment_file_url,
|
||||
local_segment_path);
|
||||
auto get_segment_file_cb = [&get_segment_file_url, &local_segment_path, segment_file_size,
|
||||
estimate_timeout](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(get_segment_file_url));
|
||||
client->set_timeout_ms(estimate_timeout * 1000);
|
||||
RETURN_IF_ERROR(client->download(local_segment_path));
|
||||
|
||||
std::error_code ec;
|
||||
// Check file length
|
||||
uint64_t local_file_size = std::filesystem::file_size(local_segment_path, ec);
|
||||
if (ec) {
|
||||
LOG(WARNING) << "download file error" << ec.message();
|
||||
return Status::IOError("can't retrive file_size of {}, due to {}",
|
||||
local_segment_path, ec.message());
|
||||
}
|
||||
if (local_file_size != segment_file_size) {
|
||||
LOG(WARNING) << "download file length error"
|
||||
<< ", get_segment_file_url=" << get_segment_file_url
|
||||
<< ", file_size=" << segment_file_size
|
||||
<< ", local_file_size=" << local_file_size;
|
||||
return Status::InternalError("downloaded file size is not equal");
|
||||
}
|
||||
chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR);
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
auto status = HttpClient::execute_with_retry(max_retry, 1, get_segment_file_cb);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to get segment file from " << get_segment_file_url
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 6: create rowset && calculate delete bitmap && commit
|
||||
// Step 6.1: create rowset
|
||||
RowsetSharedPtr rowset;
|
||||
status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
|
||||
local_tablet->tablet_path(), rowset_meta, &rowset);
|
||||
|
||||
if (!status) {
|
||||
LOG(WARNING) << "failed to create rowset from rowset meta for remote tablet"
|
||||
<< ". rowset_id: " << rowset_meta_pb.rowset_id()
|
||||
<< ", rowset_type: " << rowset_meta_pb.rowset_type()
|
||||
<< ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << ", txn_id=" << txn_id
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 6.2 calculate delete bitmap before commit
|
||||
auto calc_delete_bitmap_token =
|
||||
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
|
||||
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(local_tablet_id);
|
||||
RowsetIdUnorderedSet pre_rowset_ids;
|
||||
if (local_tablet->enable_unique_key_merge_on_write()) {
|
||||
auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
|
||||
std::vector<segment_v2::SegmentSharedPtr> segments;
|
||||
status = beta_rowset->load_segments(&segments);
|
||||
if (!status) {
|
||||
LOG(WARNING) << "failed to load segments from rowset"
|
||||
<< ". rowset_id: " << beta_rowset->rowset_id() << ", txn_id=" << txn_id
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
if (segments.size() > 1) {
|
||||
// calculate delete bitmap between segments
|
||||
status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments,
|
||||
delete_bitmap);
|
||||
if (!status) {
|
||||
LOG(WARNING) << "failed to calculate delete bitmap"
|
||||
<< ". tablet_id: " << local_tablet->tablet_id()
|
||||
<< ". rowset_id: " << rowset->rowset_id() << ", txn_id=" << txn_id
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
static_cast<void>(local_tablet->commit_phase_update_delete_bitmap(
|
||||
rowset, pre_rowset_ids, delete_bitmap, segments, txn_id,
|
||||
calc_delete_bitmap_token.get(), nullptr));
|
||||
static_cast<void>(calc_delete_bitmap_token->wait());
|
||||
}
|
||||
|
||||
// Step 6.3: commit txn
|
||||
Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn(
|
||||
local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
|
||||
rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(),
|
||||
rowset_meta->load_id(), rowset, false);
|
||||
if (!commit_txn_status && !commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) {
|
||||
auto err_msg = fmt::format(
|
||||
"failed to commit txn for remote tablet. rowset_id: {}, remote_tablet_id={}, "
|
||||
"txn_id={}, status={}",
|
||||
rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(),
|
||||
rowset_meta->txn_id(), commit_txn_status.to_string());
|
||||
LOG(WARNING) << err_msg;
|
||||
set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg));
|
||||
return;
|
||||
}
|
||||
|
||||
if (local_tablet->enable_unique_key_merge_on_write()) {
|
||||
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
|
||||
partition_id, txn_id, local_tablet_id, local_tablet->tablet_uid(), true,
|
||||
delete_bitmap, pre_rowset_ids, nullptr);
|
||||
}
|
||||
|
||||
tstatus.__set_status_code(TStatusCode::OK);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
using apache::thrift::TException;
|
||||
using apache::thrift::TProcessor;
|
||||
using apache::thrift::TMultiplexedProcessor;
|
||||
@ -90,19 +369,33 @@ BackendService::BackendService(ExecEnv* exec_env)
|
||||
|
||||
Status BackendService::create_service(ExecEnv* exec_env, int port,
|
||||
std::unique_ptr<ThriftServer>* server) {
|
||||
std::shared_ptr<BackendService> handler(new BackendService(exec_env));
|
||||
auto service = std::make_shared<BackendService>(exec_env);
|
||||
// TODO: do we want a BoostThreadFactory?
|
||||
// TODO: we want separate thread factories here, so that fe requests can't starve
|
||||
// be requests
|
||||
std::shared_ptr<ThreadFactory> thread_factory(new ThreadFactory());
|
||||
|
||||
std::shared_ptr<TProcessor> be_processor(new BackendServiceProcessor(handler));
|
||||
// std::shared_ptr<TProcessor> be_processor = std::make_shared<BackendServiceProcessor>(service);
|
||||
auto be_processor = std::make_shared<BackendServiceProcessor>(service);
|
||||
|
||||
*server = std::make_unique<ThriftServer>("backend", be_processor, port,
|
||||
config::be_service_threads);
|
||||
|
||||
LOG(INFO) << "Doris BackendService listening on " << port;
|
||||
|
||||
auto thread_num = config::ingest_binlog_work_pool_size;
|
||||
if (thread_num < 0) {
|
||||
LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, so we will in sync mode",
|
||||
thread_num);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
if (thread_num == 0) {
|
||||
thread_num = std::thread::hardware_concurrency();
|
||||
}
|
||||
static_cast<void>(doris::ThreadPoolBuilder("IngestBinlog")
|
||||
.set_min_threads(thread_num)
|
||||
.set_max_threads(thread_num * 2)
|
||||
.build(&(service->_ingest_binlog_workers)));
|
||||
LOG(INFO) << fmt::format("ingest binlog thread pool size is {}, in async mode", thread_num);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -396,8 +689,6 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
|
||||
const TIngestBinlogRequest& request) {
|
||||
LOG(INFO) << "ingest binlog. request: " << apache::thrift::ThriftDebugString(request);
|
||||
|
||||
constexpr uint64_t kMaxTimeoutMs = 1000;
|
||||
|
||||
TStatus tstatus;
|
||||
Defer defer {[&result, &tstatus]() {
|
||||
result.__set_status(tstatus);
|
||||
@ -452,6 +743,12 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
|
||||
set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
|
||||
return;
|
||||
}
|
||||
if (!request.__isset.local_tablet_id) {
|
||||
auto error_msg = "local_tablet_id is empty";
|
||||
LOG(WARNING) << error_msg;
|
||||
set_tstatus(TStatusCode::ANALYSIS_ERROR, error_msg);
|
||||
return;
|
||||
}
|
||||
if (!request.__isset.load_id) {
|
||||
auto error_msg = "load_id is empty";
|
||||
LOG(WARNING) << error_msg;
|
||||
@ -486,239 +783,105 @@ void BackendService::ingest_binlog(TIngestBinlogResult& result,
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 3: get binlog info
|
||||
auto binlog_api_url = fmt::format("http://{}:{}/api/_binlog/_download", request.remote_host,
|
||||
request.remote_port);
|
||||
constexpr int max_retry = 3;
|
||||
bool is_async = (_ingest_binlog_workers != nullptr);
|
||||
result.__set_is_async(is_async);
|
||||
|
||||
auto get_binlog_info_url =
|
||||
fmt::format("{}?method={}&tablet_id={}&binlog_version={}", binlog_api_url,
|
||||
"get_binlog_info", request.remote_tablet_id, request.binlog_version);
|
||||
std::string binlog_info;
|
||||
auto get_binlog_info_cb = [&get_binlog_info_url, &binlog_info](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(get_binlog_info_url));
|
||||
client->set_timeout_ms(kMaxTimeoutMs);
|
||||
return client->execute(&binlog_info);
|
||||
};
|
||||
status = HttpClient::execute_with_retry(max_retry, 1, get_binlog_info_cb);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to get binlog info from " << get_binlog_info_url
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
auto ingest_binlog_func = [=, tstatus = &tstatus]() {
|
||||
IngestBinlogArg ingest_binlog_arg = {
|
||||
.txn_id = txn_id,
|
||||
.partition_id = partition_id,
|
||||
.local_tablet_id = local_tablet_id,
|
||||
.local_tablet = local_tablet,
|
||||
|
||||
std::vector<std::string> binlog_info_parts = strings::Split(binlog_info, ":");
|
||||
// TODO(Drogon): check binlog info content is right
|
||||
DCHECK(binlog_info_parts.size() == 2);
|
||||
const std::string& remote_rowset_id = binlog_info_parts[0];
|
||||
int64_t num_segments = std::stoll(binlog_info_parts[1]);
|
||||
|
||||
// Step 4: get rowset meta
|
||||
auto get_rowset_meta_url = fmt::format(
|
||||
"{}?method={}&tablet_id={}&rowset_id={}&binlog_version={}", binlog_api_url,
|
||||
"get_rowset_meta", request.remote_tablet_id, remote_rowset_id, request.binlog_version);
|
||||
std::string rowset_meta_str;
|
||||
auto get_rowset_meta_cb = [&get_rowset_meta_url, &rowset_meta_str](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(get_rowset_meta_url));
|
||||
client->set_timeout_ms(kMaxTimeoutMs);
|
||||
return client->execute(&rowset_meta_str);
|
||||
};
|
||||
status = HttpClient::execute_with_retry(max_retry, 1, get_rowset_meta_cb);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to get rowset meta from " << get_rowset_meta_url
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
RowsetMetaPB rowset_meta_pb;
|
||||
if (!rowset_meta_pb.ParseFromString(rowset_meta_str)) {
|
||||
LOG(WARNING) << "failed to parse rowset meta from " << get_rowset_meta_url;
|
||||
status = Status::InternalError("failed to parse rowset meta");
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
// rewrite rowset meta
|
||||
rowset_meta_pb.set_tablet_id(local_tablet_id);
|
||||
rowset_meta_pb.set_partition_id(partition_id);
|
||||
rowset_meta_pb.set_tablet_schema_hash(local_tablet->tablet_meta()->schema_hash());
|
||||
rowset_meta_pb.set_txn_id(txn_id);
|
||||
rowset_meta_pb.set_rowset_state(RowsetStatePB::COMMITTED);
|
||||
auto rowset_meta = std::make_shared<RowsetMeta>();
|
||||
if (!rowset_meta->init_from_pb(rowset_meta_pb)) {
|
||||
LOG(WARNING) << "failed to init rowset meta from " << get_rowset_meta_url;
|
||||
status = Status::InternalError("failed to init rowset meta");
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
|
||||
rowset_meta->set_rowset_id(new_rowset_id);
|
||||
rowset_meta->set_tablet_uid(local_tablet->tablet_uid());
|
||||
|
||||
// Step 5: get all segment files
|
||||
// Step 5.1: get all segment files size
|
||||
std::vector<std::string> segment_file_urls;
|
||||
segment_file_urls.reserve(num_segments);
|
||||
std::vector<uint64_t> segment_file_sizes;
|
||||
segment_file_sizes.reserve(num_segments);
|
||||
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
|
||||
auto get_segment_file_size_url = fmt::format(
|
||||
"{}?method={}&tablet_id={}&rowset_id={}&segment_index={}", binlog_api_url,
|
||||
"get_segment_file", request.remote_tablet_id, remote_rowset_id, segment_index);
|
||||
uint64_t segment_file_size;
|
||||
auto get_segment_file_size_cb = [&get_segment_file_size_url,
|
||||
&segment_file_size](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(get_segment_file_size_url));
|
||||
client->set_timeout_ms(kMaxTimeoutMs);
|
||||
RETURN_IF_ERROR(client->head());
|
||||
return client->get_content_length(&segment_file_size);
|
||||
.request = std::move(request),
|
||||
.tstatus = is_async ? nullptr : tstatus,
|
||||
};
|
||||
|
||||
status = HttpClient::execute_with_retry(max_retry, 1, get_segment_file_size_cb);
|
||||
_ingest_binlog(&ingest_binlog_arg);
|
||||
};
|
||||
|
||||
if (is_async) {
|
||||
status = _ingest_binlog_workers->submit_func(std::move(ingest_binlog_func));
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to get segment file size from " << get_segment_file_size_url
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
|
||||
segment_file_sizes.push_back(segment_file_size);
|
||||
segment_file_urls.push_back(std::move(get_segment_file_size_url));
|
||||
} else {
|
||||
ingest_binlog_func();
|
||||
}
|
||||
}
|
||||
|
||||
// Step 5.2: check data capacity
|
||||
uint64_t total_size = std::accumulate(segment_file_sizes.begin(), segment_file_sizes.end(), 0);
|
||||
if (!local_tablet->can_add_binlog(total_size)) {
|
||||
LOG(WARNING) << "failed to add binlog, no enough space, total_size=" << total_size
|
||||
<< ", tablet=" << local_tablet->tablet_id();
|
||||
status = Status::InternalError("no enough space");
|
||||
status.to_thrift(&tstatus);
|
||||
void BackendService::query_ingest_binlog(TQueryIngestBinlogResult& result,
|
||||
const TQueryIngestBinlogRequest& request) {
|
||||
LOG(INFO) << "query ingest binlog. request: " << apache::thrift::ThriftDebugString(request);
|
||||
|
||||
auto set_result = [&](TIngestBinlogStatus::type status, std::string error_msg) {
|
||||
result.__set_status(status);
|
||||
result.__set_err_msg(std::move(error_msg));
|
||||
};
|
||||
|
||||
/// Check args: txn_id, partition_id, tablet_id, load_id
|
||||
if (!request.__isset.txn_id) {
|
||||
auto error_msg = "txn_id is empty";
|
||||
LOG(WARNING) << error_msg;
|
||||
set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
|
||||
return;
|
||||
}
|
||||
if (!request.__isset.partition_id) {
|
||||
auto error_msg = "partition_id is empty";
|
||||
LOG(WARNING) << error_msg;
|
||||
set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
|
||||
return;
|
||||
}
|
||||
if (!request.__isset.tablet_id) {
|
||||
auto error_msg = "tablet_id is empty";
|
||||
LOG(WARNING) << error_msg;
|
||||
set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
|
||||
return;
|
||||
}
|
||||
if (!request.__isset.load_id) {
|
||||
auto error_msg = "load_id is empty";
|
||||
LOG(WARNING) << error_msg;
|
||||
set_result(TIngestBinlogStatus::ANALYSIS_ERROR, error_msg);
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 5.3: get all segment files
|
||||
for (int64_t segment_index = 0; segment_index < num_segments; ++segment_index) {
|
||||
auto segment_file_size = segment_file_sizes[segment_index];
|
||||
auto get_segment_file_url = segment_file_urls[segment_index];
|
||||
auto partition_id = request.partition_id;
|
||||
auto txn_id = request.txn_id;
|
||||
auto tablet_id = request.tablet_id;
|
||||
|
||||
uint64_t estimate_timeout =
|
||||
segment_file_size / config::download_low_speed_limit_kbps / 1024;
|
||||
if (estimate_timeout < config::download_low_speed_time) {
|
||||
estimate_timeout = config::download_low_speed_time;
|
||||
}
|
||||
|
||||
auto local_segment_path = BetaRowset::segment_file_path(
|
||||
local_tablet->tablet_path(), rowset_meta->rowset_id(), segment_index);
|
||||
LOG(INFO) << fmt::format("download segment file from {} to {}", get_segment_file_url,
|
||||
local_segment_path);
|
||||
auto get_segment_file_cb = [&get_segment_file_url, &local_segment_path, segment_file_size,
|
||||
estimate_timeout](HttpClient* client) {
|
||||
RETURN_IF_ERROR(client->init(get_segment_file_url));
|
||||
client->set_timeout_ms(estimate_timeout * 1000);
|
||||
RETURN_IF_ERROR(client->download(local_segment_path));
|
||||
|
||||
std::error_code ec;
|
||||
// Check file length
|
||||
uint64_t local_file_size = std::filesystem::file_size(local_segment_path, ec);
|
||||
if (ec) {
|
||||
LOG(WARNING) << "download file error" << ec.message();
|
||||
return Status::IOError("can't retrive file_size of {}, due to {}",
|
||||
local_segment_path, ec.message());
|
||||
}
|
||||
if (local_file_size != segment_file_size) {
|
||||
LOG(WARNING) << "download file length error"
|
||||
<< ", get_segment_file_url=" << get_segment_file_url
|
||||
<< ", file_size=" << segment_file_size
|
||||
<< ", local_file_size=" << local_file_size;
|
||||
return Status::InternalError("downloaded file size is not equal");
|
||||
}
|
||||
chmod(local_segment_path.c_str(), S_IRUSR | S_IWUSR);
|
||||
return Status::OK();
|
||||
};
|
||||
|
||||
auto status = HttpClient::execute_with_retry(max_retry, 1, get_segment_file_cb);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "failed to get segment file from " << get_segment_file_url
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// Step 6: create rowset && calculate delete bitmap && commit
|
||||
// Step 6.1: create rowset
|
||||
RowsetSharedPtr rowset;
|
||||
status = RowsetFactory::create_rowset(local_tablet->tablet_schema(),
|
||||
local_tablet->tablet_path(), rowset_meta, &rowset);
|
||||
|
||||
if (!status) {
|
||||
LOG(WARNING) << "failed to create rowset from rowset meta for remote tablet"
|
||||
<< ". rowset_id: " << rowset_meta_pb.rowset_id()
|
||||
<< ", rowset_type: " << rowset_meta_pb.rowset_type()
|
||||
<< ", remote_tablet_id=" << rowset_meta_pb.tablet_id() << ", txn_id=" << txn_id
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
// Step 1: get local tablet
|
||||
auto local_tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
|
||||
if (local_tablet == nullptr) {
|
||||
auto error_msg = fmt::format("tablet {} not found", tablet_id);
|
||||
LOG(WARNING) << error_msg;
|
||||
set_result(TIngestBinlogStatus::NOT_FOUND, std::move(error_msg));
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 6.2 calculate delete bitmap before commit
|
||||
auto calc_delete_bitmap_token =
|
||||
StorageEngine::instance()->calc_delete_bitmap_executor()->create_token();
|
||||
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(local_tablet_id);
|
||||
RowsetIdUnorderedSet pre_rowset_ids;
|
||||
if (local_tablet->enable_unique_key_merge_on_write()) {
|
||||
auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
|
||||
std::vector<segment_v2::SegmentSharedPtr> segments;
|
||||
status = beta_rowset->load_segments(&segments);
|
||||
if (!status) {
|
||||
LOG(WARNING) << "failed to load segments from rowset"
|
||||
<< ". rowset_id: " << beta_rowset->rowset_id() << ", txn_id=" << txn_id
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
if (segments.size() > 1) {
|
||||
// calculate delete bitmap between segments
|
||||
status = local_tablet->calc_delete_bitmap_between_segments(rowset, segments,
|
||||
delete_bitmap);
|
||||
if (!status) {
|
||||
LOG(WARNING) << "failed to calculate delete bitmap"
|
||||
<< ". tablet_id: " << local_tablet->tablet_id()
|
||||
<< ". rowset_id: " << rowset->rowset_id() << ", txn_id=" << txn_id
|
||||
<< ", status=" << status.to_string();
|
||||
status.to_thrift(&tstatus);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
static_cast<void>(local_tablet->commit_phase_update_delete_bitmap(
|
||||
rowset, pre_rowset_ids, delete_bitmap, segments, txn_id,
|
||||
calc_delete_bitmap_token.get(), nullptr));
|
||||
static_cast<void>(calc_delete_bitmap_token->wait());
|
||||
// Step 2: get txn state
|
||||
auto tablet_uid = local_tablet->tablet_uid();
|
||||
auto txn_state = StorageEngine::instance()->txn_manager()->get_txn_state(partition_id, txn_id,
|
||||
tablet_id, tablet_uid);
|
||||
switch (txn_state) {
|
||||
case TxnState::NOT_FOUND:
|
||||
result.__set_status(TIngestBinlogStatus::NOT_FOUND);
|
||||
break;
|
||||
case TxnState::PREPARED:
|
||||
result.__set_status(TIngestBinlogStatus::DOING);
|
||||
break;
|
||||
case TxnState::COMMITTED:
|
||||
result.__set_status(TIngestBinlogStatus::OK);
|
||||
break;
|
||||
case TxnState::ROLLEDBACK:
|
||||
result.__set_status(TIngestBinlogStatus::FAILED);
|
||||
break;
|
||||
case TxnState::ABORTED:
|
||||
result.__set_status(TIngestBinlogStatus::FAILED);
|
||||
break;
|
||||
case TxnState::DELETED:
|
||||
result.__set_status(TIngestBinlogStatus::FAILED);
|
||||
break;
|
||||
}
|
||||
|
||||
// Step 6.3: commit txn
|
||||
Status commit_txn_status = StorageEngine::instance()->txn_manager()->commit_txn(
|
||||
local_tablet->data_dir()->get_meta(), rowset_meta->partition_id(),
|
||||
rowset_meta->txn_id(), rowset_meta->tablet_id(), local_tablet->tablet_uid(),
|
||||
rowset_meta->load_id(), rowset, false);
|
||||
if (!commit_txn_status && !commit_txn_status.is<ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST>()) {
|
||||
auto err_msg = fmt::format(
|
||||
"failed to commit txn for remote tablet. rowset_id: {}, remote_tablet_id={}, "
|
||||
"txn_id={}, status={}",
|
||||
rowset_meta->rowset_id().to_string(), rowset_meta->tablet_id(),
|
||||
rowset_meta->txn_id(), commit_txn_status.to_string());
|
||||
LOG(WARNING) << err_msg;
|
||||
set_tstatus(TStatusCode::RUNTIME_ERROR, std::move(err_msg));
|
||||
return;
|
||||
}
|
||||
|
||||
if (local_tablet->enable_unique_key_merge_on_write()) {
|
||||
StorageEngine::instance()->txn_manager()->set_txn_related_delete_bitmap(
|
||||
partition_id, txn_id, local_tablet_id, local_tablet->tablet_uid(), true,
|
||||
delete_bitmap, pre_rowset_ids, nullptr);
|
||||
}
|
||||
|
||||
tstatus.__set_status_code(TStatusCode::OK);
|
||||
}
|
||||
} // namespace doris
|
||||
|
||||
@ -58,6 +58,7 @@ class TTransmitDataParams;
|
||||
class TUniqueId;
|
||||
class TIngestBinlogRequest;
|
||||
class TIngestBinlogResult;
|
||||
class ThreadPool;
|
||||
|
||||
// This class just forward rpc for actual handler
|
||||
// make this class because we can bind multiple service on single point
|
||||
@ -137,10 +138,14 @@ public:
|
||||
|
||||
void ingest_binlog(TIngestBinlogResult& result, const TIngestBinlogRequest& request) override;
|
||||
|
||||
void query_ingest_binlog(TQueryIngestBinlogResult& result,
|
||||
const TQueryIngestBinlogRequest& request) override;
|
||||
|
||||
private:
|
||||
Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params);
|
||||
ExecEnv* _exec_env;
|
||||
std::unique_ptr<AgentServer> _agent_server;
|
||||
std::unique_ptr<ThreadPool> _ingest_binlog_workers;
|
||||
};
|
||||
|
||||
} // namespace doris
|
||||
|
||||
@ -17,6 +17,9 @@
|
||||
|
||||
#include "service/http_service.h"
|
||||
|
||||
#include <event2/bufferevent.h>
|
||||
#include <event2/http.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
@ -59,6 +62,30 @@
|
||||
#include "util/doris_metrics.h"
|
||||
|
||||
namespace doris {
|
||||
namespace {
|
||||
std::shared_ptr<bufferevent_rate_limit_group> get_rate_limit_group(event_base* event_base) {
|
||||
auto rate_limit = config::download_binlog_rate_limit_kbs;
|
||||
if (rate_limit <= 0) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
auto max_value = std::numeric_limits<int32_t>::max() / 1024 * 10;
|
||||
if (rate_limit > max_value) {
|
||||
LOG(WARNING) << "rate limit is too large, set to max value.";
|
||||
rate_limit = max_value;
|
||||
}
|
||||
struct timeval cfg_tick = {0, 100 * 1000}; // 100ms
|
||||
rate_limit = rate_limit / 10 * 1024; // convert to KB/S
|
||||
|
||||
auto token_bucket = std::unique_ptr<ev_token_bucket_cfg, decltype(&ev_token_bucket_cfg_free)>(
|
||||
ev_token_bucket_cfg_new(rate_limit, rate_limit * 2, rate_limit, rate_limit * 2,
|
||||
&cfg_tick),
|
||||
ev_token_bucket_cfg_free);
|
||||
return std::shared_ptr<bufferevent_rate_limit_group>(
|
||||
bufferevent_rate_limit_group_new(event_base, token_bucket.get()),
|
||||
bufferevent_rate_limit_group_free);
|
||||
}
|
||||
} // namespace
|
||||
|
||||
HttpService::HttpService(ExecEnv* env, int port, int num_threads)
|
||||
: _env(env),
|
||||
@ -72,6 +99,9 @@ HttpService::~HttpService() {
|
||||
Status HttpService::start() {
|
||||
add_default_path_handlers(_web_page_handler.get());
|
||||
|
||||
auto event_base = _ev_http_server->get_event_bases()[0];
|
||||
_rate_limit_group = get_rate_limit_group(event_base.get());
|
||||
|
||||
// register load
|
||||
StreamLoadAction* streamload_action = _pool.add(new StreamLoadAction(_env));
|
||||
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load",
|
||||
@ -93,18 +123,19 @@ Status HttpService::start() {
|
||||
for (auto& path : _env->store_paths()) {
|
||||
allow_paths.emplace_back(path.path);
|
||||
}
|
||||
DownloadAction* download_action = _pool.add(new DownloadAction(_env, allow_paths));
|
||||
DownloadAction* download_action = _pool.add(new DownloadAction(_env, nullptr, allow_paths));
|
||||
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action);
|
||||
_ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load", download_action);
|
||||
|
||||
DownloadAction* tablet_download_action = _pool.add(new DownloadAction(_env, allow_paths));
|
||||
DownloadAction* tablet_download_action =
|
||||
_pool.add(new DownloadAction(_env, _rate_limit_group, allow_paths));
|
||||
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_tablet/_download",
|
||||
tablet_download_action);
|
||||
_ev_http_server->register_handler(HttpMethod::GET, "/api/_tablet/_download",
|
||||
tablet_download_action);
|
||||
if (config::enable_single_replica_load) {
|
||||
DownloadAction* single_replica_download_action = _pool.add(new DownloadAction(
|
||||
_env, allow_paths, config::single_replica_load_download_num_workers));
|
||||
_env, nullptr, allow_paths, config::single_replica_load_download_num_workers));
|
||||
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_single_replica/_download",
|
||||
single_replica_download_action);
|
||||
_ev_http_server->register_handler(HttpMethod::GET, "/api/_single_replica/_download",
|
||||
@ -118,7 +149,8 @@ Status HttpService::start() {
|
||||
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_load_error_log",
|
||||
error_log_download_action);
|
||||
|
||||
DownloadBinlogAction* download_binlog_action = _pool.add(new DownloadBinlogAction(_env));
|
||||
DownloadBinlogAction* download_binlog_action =
|
||||
_pool.add(new DownloadBinlogAction(_env, _rate_limit_group));
|
||||
_ev_http_server->register_handler(HttpMethod::GET, "/api/_binlog/_download",
|
||||
download_binlog_action);
|
||||
_ev_http_server->register_handler(HttpMethod::HEAD, "/api/_binlog/_download",
|
||||
|
||||
@ -22,6 +22,8 @@
|
||||
#include "common/object_pool.h"
|
||||
#include "common/status.h"
|
||||
|
||||
struct bufferevent_rate_limit_group;
|
||||
|
||||
namespace doris {
|
||||
|
||||
class ExecEnv;
|
||||
@ -47,6 +49,8 @@ private:
|
||||
std::unique_ptr<EvHttpServer> _ev_http_server;
|
||||
std::unique_ptr<WebPageHandler> _web_page_handler;
|
||||
|
||||
std::shared_ptr<bufferevent_rate_limit_group> _rate_limit_group {nullptr};
|
||||
|
||||
bool stopped = false;
|
||||
};
|
||||
|
||||
|
||||
@ -34,6 +34,8 @@ import org.apache.doris.thrift.TIngestBinlogResult;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TPublishTopicRequest;
|
||||
import org.apache.doris.thrift.TPublishTopicResult;
|
||||
import org.apache.doris.thrift.TQueryIngestBinlogRequest;
|
||||
import org.apache.doris.thrift.TQueryIngestBinlogResult;
|
||||
import org.apache.doris.thrift.TRoutineLoadTask;
|
||||
import org.apache.doris.thrift.TScanBatchResult;
|
||||
import org.apache.doris.thrift.TScanCloseParams;
|
||||
@ -229,6 +231,12 @@ public class GenericPoolTest {
|
||||
public TIngestBinlogResult ingestBinlog(TIngestBinlogRequest ingestBinlogRequest) throws TException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TQueryIngestBinlogResult queryIngestBinlog(TQueryIngestBinlogRequest queryIngestBinlogRequest)
|
||||
throws TException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@ -52,6 +52,8 @@ import org.apache.doris.thrift.TMasterInfo;
|
||||
import org.apache.doris.thrift.TNetworkAddress;
|
||||
import org.apache.doris.thrift.TPublishTopicRequest;
|
||||
import org.apache.doris.thrift.TPublishTopicResult;
|
||||
import org.apache.doris.thrift.TQueryIngestBinlogRequest;
|
||||
import org.apache.doris.thrift.TQueryIngestBinlogResult;
|
||||
import org.apache.doris.thrift.TRoutineLoadTask;
|
||||
import org.apache.doris.thrift.TScanBatchResult;
|
||||
import org.apache.doris.thrift.TScanCloseParams;
|
||||
@ -375,6 +377,12 @@ public class MockedBackendFactory {
|
||||
public TIngestBinlogResult ingestBinlog(TIngestBinlogRequest ingestBinlogRequest) throws TException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TQueryIngestBinlogResult queryIngestBinlog(TQueryIngestBinlogRequest queryIngestBinlogRequest)
|
||||
throws TException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
// The default Brpc service.
|
||||
|
||||
@ -136,6 +136,28 @@ struct TIngestBinlogRequest {
|
||||
|
||||
struct TIngestBinlogResult {
|
||||
1: optional Status.TStatus status;
|
||||
2: optional bool is_async;
|
||||
}
|
||||
|
||||
struct TQueryIngestBinlogRequest {
|
||||
1: optional i64 txn_id;
|
||||
2: optional i64 partition_id;
|
||||
3: optional i64 tablet_id;
|
||||
4: optional Types.TUniqueId load_id;
|
||||
}
|
||||
|
||||
enum TIngestBinlogStatus {
|
||||
ANALYSIS_ERROR,
|
||||
UNKNOWN,
|
||||
NOT_FOUND,
|
||||
OK,
|
||||
FAILED,
|
||||
DOING
|
||||
}
|
||||
|
||||
struct TQueryIngestBinlogResult {
|
||||
1: optional TIngestBinlogStatus status;
|
||||
2: optional string err_msg;
|
||||
}
|
||||
|
||||
enum TTopicInfoType {
|
||||
@ -211,6 +233,7 @@ service BackendService {
|
||||
TCheckStorageFormatResult check_storage_format();
|
||||
|
||||
TIngestBinlogResult ingest_binlog(1: TIngestBinlogRequest ingest_binlog_request);
|
||||
TQueryIngestBinlogResult query_ingest_binlog(1: TQueryIngestBinlogRequest query_ingest_binlog_request);
|
||||
|
||||
TPublishTopicResult publish_topic_info(1:TPublishTopicRequest topic_request);
|
||||
}
|
||||
|
||||
@ -72,7 +72,6 @@ class Syncer {
|
||||
}
|
||||
|
||||
private Boolean checkBinlog(TBinlog binlog, String table, Boolean update) {
|
||||
|
||||
// step 1: check binlog availability
|
||||
if (binlog == null) {
|
||||
return false
|
||||
@ -735,6 +734,7 @@ class Syncer {
|
||||
if (!binlogRecords.contains(srcPartition.key)) {
|
||||
continue
|
||||
}
|
||||
|
||||
Iterator srcTabletIter = srcPartition.value.tabletMeta.iterator()
|
||||
Iterator tarTabletIter = tarPartition.value.tabletMeta.iterator()
|
||||
|
||||
@ -771,6 +771,7 @@ class Syncer {
|
||||
logger.info("request -> ${request}")
|
||||
TIngestBinlogResult result = tarClient.client.ingestBinlog(request)
|
||||
if (!checkIngestBinlog(result)) {
|
||||
logger.error("Ingest binlog error! result: ${result}")
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
@ -22,6 +22,7 @@ suite("test_ingest_binlog") {
|
||||
logger.info("fe enable_feature_binlog is false, skip case test_ingest_binlog")
|
||||
return
|
||||
}
|
||||
|
||||
def tableName = "tbl_ingest_binlog"
|
||||
def insert_num = 5
|
||||
def test_num = 0
|
||||
@ -102,6 +103,7 @@ suite("test_ingest_binlog") {
|
||||
logger.info("=== Test 2.2: Wrong binlog version case ===")
|
||||
// -1 means use the number of syncer.context
|
||||
// Boolean ingestBinlog(long fakePartitionId = -1, long fakeVersion = -1)
|
||||
// use fakeVersion = 1, 1 is doris be talet first version, so no binlog, only http error
|
||||
assertTrue(syncer.ingestBinlog(-1, 1) == false)
|
||||
|
||||
|
||||
@ -120,4 +122,4 @@ suite("test_ingest_binlog") {
|
||||
|
||||
// End Test 2
|
||||
syncer.closeBackendClients()
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user