[refactor](load) Remove mini load (#10520)

This commit is contained in:
yiguolei
2022-06-30 23:21:41 +08:00
committed by GitHub
parent 18ad8ebfbb
commit aab7dc956f
30 changed files with 5 additions and 2694 deletions

View File

@ -29,7 +29,6 @@
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "olap/snapshot_manager.h"
#include "runtime/etl_job_mgr.h"
using std::string;
using std::vector;
@ -247,45 +246,4 @@ void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
status.to_thrift(&t_agent_result.status);
}
void AgentServer::submit_etl_task(TAgentResult& t_agent_result,
const TMiniLoadEtlTaskRequest& request) {
Status status = _exec_env->etl_job_mgr()->start_job(request);
auto fragment_instance_id = request.params.params.fragment_instance_id;
if (status.ok()) {
VLOG_RPC << "success to submit etl task. id=" << fragment_instance_id;
} else {
VLOG_RPC << "fail to submit etl task. id=" << fragment_instance_id
<< ", err_msg=" << status.get_error_msg();
}
status.to_thrift(&t_agent_result.status);
}
void AgentServer::get_etl_status(TMiniLoadEtlStatusResult& t_agent_result,
const TMiniLoadEtlStatusRequest& request) {
Status status = _exec_env->etl_job_mgr()->get_job_state(request.mini_load_id, &t_agent_result);
if (!status.ok()) {
LOG(WARNING) << "fail to get job state. [id=" << request.mini_load_id << "]";
}
VLOG_RPC << "success to get job state. [id=" << request.mini_load_id
<< ", status=" << t_agent_result.status.status_code
<< ", etl_state=" << t_agent_result.etl_state << ", files=";
for (auto& item : t_agent_result.file_map) {
VLOG_RPC << item.first << ":" << item.second << ";";
}
VLOG_RPC << "]";
}
void AgentServer::delete_etl_files(TAgentResult& t_agent_result,
const TDeleteEtlFilesRequest& request) {
Status status = _exec_env->etl_job_mgr()->erase_job(request);
if (!status.ok()) {
LOG(WARNING) << "fail to delete etl files. because " << status.get_error_msg()
<< " with request " << request;
}
VLOG_RPC << "success to delete etl files. request=" << request;
status.to_thrift(&t_agent_result.status);
}
} // namespace doris

View File

@ -47,12 +47,6 @@ public:
// [[deprecated]]
void publish_cluster_state(TAgentResult& agent_result, const TAgentPublishRequest& request);
// Multi-Load will still use the following 3 methods for now.
void submit_etl_task(TAgentResult& agent_result, const TMiniLoadEtlTaskRequest& request);
void get_etl_status(TMiniLoadEtlStatusResult& agent_result,
const TMiniLoadEtlStatusRequest& request);
void delete_etl_files(TAgentResult& result, const TDeleteEtlFilesRequest& request);
private:
DISALLOW_COPY_AND_ASSIGN(AgentServer);

View File

@ -34,7 +34,6 @@ add_library(Webserver STATIC
ev_http_server.cpp
http_client.cpp
action/download_action.cpp
action/mini_load.cpp
action/monitor_action.cpp
action/health_action.cpp
action/tablet_migration_action.cpp

View File

@ -1,966 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "http/action/mini_load.h"
#include <event2/buffer.h>
#include <event2/bufferevent.h>
#include <event2/http.h>
#include <fcntl.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <time.h>
#include <unistd.h>
#include <functional>
#include <mutex>
#include <sstream>
#include <string>
#include "agent/cgroups_mgr.h"
#include "common/status.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/FrontendService_types.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "gen_cpp/MasterService_types.h"
#include "http/http_channel.h"
#include "http/http_headers.h"
#include "http/http_parser.h"
#include "http/http_request.h"
#include "http/http_response.h"
#include "http/http_status.h"
#include "http/utils.h"
#include "olap/file_helper.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/load_path_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "service/backend_options.h"
#include "util/file_utils.h"
#include "util/json_util.h"
#include "util/string_parser.hpp"
#include "util/string_util.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"
#include "util/url_coding.h"
namespace doris {
// context used to handle mini-load in asynchronous mode
struct MiniLoadAsyncCtx {
MiniLoadAsyncCtx(MiniLoadAction* handler_) : handler(handler_) {}
~MiniLoadAsyncCtx() {
if (need_remove_handle) {
handler->erase_handle(load_handle);
}
if (fd >= 0) {
::close(fd);
}
}
MiniLoadAction* handler;
// used to check duplicate
LoadHandle load_handle;
bool need_remove_handle = false;
// file to save
std::string file_path;
int fd = -1;
size_t body_bytes = 0;
size_t bytes_written = 0;
TLoadCheckRequest load_check_req;
};
struct MiniLoadCtx {
MiniLoadCtx(bool is_streaming_) : is_streaming(is_streaming_) {}
bool is_streaming = false;
MiniLoadAsyncCtx* mini_load_async_ctx = nullptr;
StreamLoadContext* stream_load_ctx = nullptr;
};
const std::string CLUSTER_KEY = "cluster";
const std::string DB_KEY = "db";
const std::string TABLE_KEY = "table";
const std::string LABEL_KEY = "label";
const std::string SUB_LABEL_KEY = "sub_label";
const std::string FILE_PATH_KEY = "file_path";
const std::string COLUMNS_KEY = "columns";
const std::string HLL_KEY = "hll";
const std::string COLUMN_SEPARATOR_KEY = "column_separator";
const std::string MAX_FILTER_RATIO_KEY = "max_filter_ratio";
const std::string STRICT_MODE_KEY = "strict_mode";
const std::string TIMEOUT_KEY = "timeout";
const char* k_100_continue = "100-continue";
MiniLoadAction::MiniLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {}
static bool is_name_valid(const std::string& name) {
return !name.empty();
}
static Status check_request(HttpRequest* req) {
auto& params = *req->params();
// check params
if (!is_name_valid(params[DB_KEY])) {
return Status::InternalError("Database name is not valid.");
}
if (!is_name_valid(params[TABLE_KEY])) {
return Status::InternalError("Table name is not valid.");
}
if (!is_name_valid(params[LABEL_KEY])) {
return Status::InternalError("Label name is not valid.");
}
return Status::OK();
}
Status MiniLoadAction::data_saved_dir(const LoadHandle& desc, const std::string& table,
std::string* file_path) {
std::string prefix;
RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(desc.db, desc.label, &prefix));
timeval tv;
gettimeofday(&tv, nullptr);
struct tm tm;
time_t cur_sec = tv.tv_sec;
localtime_r(&cur_sec, &tm);
char buf[64];
strftime(buf, 64, "%Y%m%d%H%M%S", &tm);
std::stringstream ss;
ss << prefix << "/" << table << "." << desc.sub_label << "." << buf << "." << tv.tv_usec;
*file_path = ss.str();
return Status::OK();
}
Status MiniLoadAction::_load(HttpRequest* http_req, const std::string& file_path,
const std::string& user, const std::string& cluster,
int64_t file_size) {
// Prepare request parameters.
std::map<std::string, std::string> params(http_req->query_params().begin(),
http_req->query_params().end());
RETURN_IF_ERROR(_merge_header(http_req, &params));
params.erase(LABEL_KEY);
params.erase(SUB_LABEL_KEY);
// put here to log master information
const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
Status status;
FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address,
config::thrift_rpc_timeout_ms, &status);
if (!status.ok()) {
std::stringstream ss;
ss << "Connect master failed, with address(" << master_address.hostname << ":"
<< master_address.port << ")";
LOG(WARNING) << ss.str();
return status;
}
TFeResult res;
try {
TMiniLoadRequest req;
req.protocolVersion = FrontendServiceVersion::V1;
req.__set_db(http_req->param(DB_KEY));
if (!cluster.empty()) {
req.__set_cluster(cluster);
}
req.__set_tbl(http_req->param(TABLE_KEY));
req.__set_label(http_req->param(LABEL_KEY));
req.__set_user(user);
// Belong to a multi-load transaction
if (!http_req->param(SUB_LABEL_KEY).empty()) {
req.__set_subLabel(http_req->param(SUB_LABEL_KEY));
}
req.__set_properties(params);
req.files.push_back(file_path);
req.__isset.file_size = true;
req.file_size.push_back(file_size);
req.backend.__set_hostname(BackendOptions::get_localhost());
req.backend.__set_port(config::be_port);
req.__set_timestamp(GetCurrentTimeMicros());
try {
client->miniLoad(res, req);
} catch (apache::thrift::transport::TTransportException& e) {
LOG(WARNING) << "Retrying mini load from master(" << master_address.hostname << ":"
<< master_address.port << ") because: " << e.what();
status = client.reopen(config::thrift_rpc_timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
<< ":" << master_address.port << ")";
return status;
}
client->miniLoad(res, req);
} catch (apache::thrift::TApplicationException& e) {
LOG(WARNING) << "mini load request from master(" << master_address.hostname << ":"
<< master_address.port << ") got unknown result: " << e.what();
status = client.reopen(config::thrift_rpc_timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
<< ":" << master_address.port << ")";
return status;
}
client->miniLoad(res, req);
}
} catch (apache::thrift::TException& e) {
// failed when retry.
// reopen to disable this connection
client.reopen(config::thrift_rpc_timeout_ms);
std::stringstream ss;
ss << "Request miniload from master(" << master_address.hostname << ":"
<< master_address.port << ") because: " << e.what();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
return Status(res.status);
}
Status MiniLoadAction::_merge_header(HttpRequest* http_req,
std::map<std::string, std::string>* params) {
if (http_req == nullptr || params == nullptr) {
return Status::OK();
}
if (!http_req->header(HTTP_FORMAT_KEY).empty()) {
(*params)[HTTP_FORMAT_KEY] = http_req->header(HTTP_FORMAT_KEY);
}
if (!http_req->header(HTTP_COLUMNS).empty()) {
(*params)[HTTP_COLUMNS] = http_req->header(HTTP_COLUMNS);
}
if (!http_req->header(HTTP_WHERE).empty()) {
(*params)[HTTP_WHERE] = http_req->header(HTTP_WHERE);
}
if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) {
(*params)[HTTP_COLUMN_SEPARATOR] = http_req->header(HTTP_COLUMN_SEPARATOR);
}
if (!http_req->header(HTTP_PARTITIONS).empty()) {
(*params)[HTTP_PARTITIONS] = http_req->header(HTTP_PARTITIONS);
if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
return Status::InvalidArgument(
"Can not specify both partitions and temporary partitions");
}
}
if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) {
(*params)[HTTP_TEMP_PARTITIONS] = http_req->header(HTTP_TEMP_PARTITIONS);
if (!http_req->header(HTTP_PARTITIONS).empty()) {
return Status::InvalidArgument(
"Can not specify both partitions and temporary partitions");
}
}
if (!http_req->header(HTTP_NEGATIVE).empty() &&
iequal(http_req->header(HTTP_NEGATIVE), "true")) {
(*params)[HTTP_NEGATIVE] = "true";
} else {
(*params)[HTTP_NEGATIVE] = "false";
}
if (!http_req->header(HTTP_STRICT_MODE).empty()) {
if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) {
(*params)[HTTP_STRICT_MODE] = "false";
} else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) {
(*params)[HTTP_STRICT_MODE] = "true";
} else {
return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
}
}
if (!http_req->header(HTTP_TIMEZONE).empty()) {
(*params)[HTTP_TIMEZONE] = http_req->header(HTTP_TIMEZONE);
}
if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) {
(*params)[HTTP_EXEC_MEM_LIMIT] = http_req->header(HTTP_EXEC_MEM_LIMIT);
}
if (!http_req->header(HTTP_JSONPATHS).empty()) {
(*params)[HTTP_JSONPATHS] = http_req->header(HTTP_JSONPATHS);
}
if (!http_req->header(HTTP_JSONROOT).empty()) {
(*params)[HTTP_JSONROOT] = http_req->header(HTTP_JSONROOT);
}
if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
(*params)[HTTP_STRIP_OUTER_ARRAY] = "true";
} else {
(*params)[HTTP_STRIP_OUTER_ARRAY] = "false";
}
} else {
(*params)[HTTP_STRIP_OUTER_ARRAY] = "false";
}
if (!http_req->header(HTTP_FUZZY_PARSE).empty()) {
if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
(*params)[HTTP_FUZZY_PARSE] = "true";
} else {
(*params)[HTTP_FUZZY_PARSE] = "false";
}
} else {
(*params)[HTTP_FUZZY_PARSE] = "false";
}
if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
(*params)[HTTP_READ_JSON_BY_LINE] = "true";
} else {
(*params)[HTTP_READ_JSON_BY_LINE] = "false";
}
} else {
(*params)[HTTP_READ_JSON_BY_LINE] = "false";
}
if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) {
(*params)[HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL] =
http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL);
}
if (params->find(HTTP_MERGE_TYPE) == params->end()) {
params->insert(std::make_pair(HTTP_MERGE_TYPE, "APPEND"));
}
StringCaseMap<TMergeType::type> merge_type_map = {{"APPEND", TMergeType::APPEND},
{"DELETE", TMergeType::DELETE},
{"MERGE", TMergeType::MERGE}};
if (!http_req->header(HTTP_MERGE_TYPE).empty()) {
std::string merge_type = http_req->header(HTTP_MERGE_TYPE);
auto it = merge_type_map.find(merge_type);
if (it != merge_type_map.end()) {
(*params)[HTTP_MERGE_TYPE] = it->first;
} else {
return Status::InvalidArgument("Invalid merge type " + merge_type);
}
}
if (!http_req->header(HTTP_DELETE_CONDITION).empty()) {
if ((*params)[HTTP_MERGE_TYPE] == "MERGE") {
(*params)[HTTP_DELETE_CONDITION] = http_req->header(HTTP_DELETE_CONDITION);
} else {
return Status::InvalidArgument("not support delete when merge type is " +
(*params)[HTTP_MERGE_TYPE] + ".");
}
}
return Status::OK();
}
static bool parse_auth(const std::string& auth, std::string* user, std::string* passwd,
std::string* cluster) {
std::string decoded_auth;
if (!base64_decode(auth, &decoded_auth)) {
return false;
}
std::string::size_type pos = decoded_auth.find(':');
if (pos == std::string::npos) {
return false;
}
user->assign(decoded_auth.c_str(), pos);
passwd->assign(decoded_auth.c_str() + pos + 1);
const std::string::size_type cluster_pos = user->find('@');
if (cluster_pos != std::string::npos) {
cluster->assign(user->c_str(), cluster_pos + 1, pos - cluster_pos - 1);
user->assign(user->c_str(), cluster_pos);
}
return true;
}
Status MiniLoadAction::check_auth(const HttpRequest* http_req,
const TLoadCheckRequest& check_load_req) {
// put here to log master information
const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
Status status;
FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address,
config::thrift_rpc_timeout_ms, &status);
if (!status.ok()) {
std::stringstream ss;
ss << "Connect master failed, with address(" << master_address.hostname << ":"
<< master_address.port << ")";
LOG(WARNING) << ss.str();
return status;
}
TFeResult res;
try {
try {
client->loadCheck(res, check_load_req);
} catch (apache::thrift::transport::TTransportException& e) {
LOG(WARNING) << "Retrying mini load from master(" << master_address.hostname << ":"
<< master_address.port << ") because: " << e.what();
status = client.reopen(config::thrift_rpc_timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
<< ":" << master_address.port << ")";
return status;
}
client->loadCheck(res, check_load_req);
} catch (apache::thrift::TApplicationException& e) {
LOG(WARNING) << "load check request from master(" << master_address.hostname << ":"
<< master_address.port << ") got unknown result: " << e.what();
status = client.reopen(config::thrift_rpc_timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname
<< ":" << master_address.port << ")";
return status;
}
client->loadCheck(res, check_load_req);
}
} catch (apache::thrift::TException& e) {
// failed when retry.
// reopen to disable this connection
client.reopen(config::thrift_rpc_timeout_ms);
std::stringstream ss;
ss << "Request miniload from master(" << master_address.hostname << ":"
<< master_address.port << ") because: " << e.what();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
return Status(res.status);
}
void MiniLoadAction::erase_handle(const LoadHandle& desc) {
// remove
std::lock_guard<std::mutex> l(_lock);
_current_load.erase(desc);
}
int MiniLoadAction::on_header(HttpRequest* req) {
// check authorization first, make client know what happened
if (req->header(HttpHeaders::AUTHORIZATION).empty()) {
HttpChannel::send_basic_challenge(req, "mini_load");
return -1;
}
Status status;
MiniLoadCtx* mini_load_ctx = new MiniLoadCtx(_is_streaming(req));
req->set_handler_ctx(mini_load_ctx);
if (((MiniLoadCtx*)req->handler_ctx())->is_streaming) {
status = _on_new_header(req);
StreamLoadContext* ctx = ((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx;
if (ctx != nullptr) {
ctx->status = status;
}
} else {
status = _on_header(req);
}
if (!status.ok()) {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.get_error_msg());
return -1;
}
return 0;
}
bool MiniLoadAction::_is_streaming(HttpRequest* req) {
// multi load must be non-streaming
if (!req->param(SUB_LABEL_KEY).empty()) {
return false;
}
TIsMethodSupportedRequest request;
request.__set_function_name(_streaming_function_name);
const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
TFeResult res;
Status status = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_address.hostname, master_address.port,
[&request, &res](FrontendServiceConnection& client) {
client->isMethodSupported(res, request);
});
if (!status.ok()) {
std::stringstream ss;
ss << "This mini load is not streaming because: " << status.get_error_msg()
<< " with address(" << master_address.hostname << ":" << master_address.port << ")";
LOG(INFO) << ss.str();
return false;
}
status = Status(res.status);
if (!status.ok()) {
std::stringstream ss;
ss << "This streaming mini load is not be supportd because: " << status.get_error_msg()
<< " with address(" << master_address.hostname << ":" << master_address.port << ")";
LOG(INFO) << ss.str();
return false;
}
return true;
}
Status MiniLoadAction::_on_header(HttpRequest* req) {
size_t body_bytes = 0;
size_t max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
if (body_bytes > max_body_bytes) {
std::stringstream ss;
ss << "file size exceed max body size, max_body_bytes=" << max_body_bytes;
return Status::InternalError(ss.str());
}
} else {
evhttp_connection_set_max_body_size(
evhttp_request_get_connection(req->get_evhttp_request()), max_body_bytes);
}
RETURN_IF_ERROR(check_request(req));
std::unique_ptr<MiniLoadAsyncCtx> mini_load_async_ctx(new MiniLoadAsyncCtx(this));
mini_load_async_ctx->body_bytes = body_bytes;
mini_load_async_ctx->load_handle.db = req->param(DB_KEY);
mini_load_async_ctx->load_handle.label = req->param(LABEL_KEY);
mini_load_async_ctx->load_handle.sub_label = req->param(SUB_LABEL_KEY);
// check if duplicate
// Use this to prevent that two callback function write to one file
// that file may be writen bad
{
std::lock_guard<std::mutex> l(_lock);
if (_current_load.find(mini_load_async_ctx->load_handle) != _current_load.end()) {
return Status::InternalError("Duplicate mini load request.");
}
_current_load.insert(mini_load_async_ctx->load_handle);
mini_load_async_ctx->need_remove_handle = true;
}
// generate load check request
RETURN_IF_ERROR(generate_check_load_req(req, &mini_load_async_ctx->load_check_req));
// Check auth
RETURN_IF_ERROR(check_auth(req, mini_load_async_ctx->load_check_req));
// Receive data first, keep things easy.
RETURN_IF_ERROR(data_saved_dir(mini_load_async_ctx->load_handle, req->param(TABLE_KEY),
&mini_load_async_ctx->file_path));
// destructor will close the file handle, not depend on DeferOp any more
mini_load_async_ctx->fd =
open(mini_load_async_ctx->file_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0660);
if (mini_load_async_ctx->fd < 0) {
char buf[64];
LOG(WARNING) << "open file failed, path=" << mini_load_async_ctx->file_path
<< ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, sizeof(buf));
return Status::InternalError("open file failed");
}
((MiniLoadCtx*)req->handler_ctx())->mini_load_async_ctx = mini_load_async_ctx.release();
return Status::OK();
}
void MiniLoadAction::on_chunk_data(HttpRequest* http_req) {
MiniLoadCtx* ctx = (MiniLoadCtx*)http_req->handler_ctx();
if (ctx->is_streaming) {
_on_new_chunk_data(http_req);
} else {
_on_chunk_data(http_req);
}
}
void MiniLoadAction::_on_chunk_data(HttpRequest* http_req) {
MiniLoadAsyncCtx* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->mini_load_async_ctx;
if (ctx == nullptr) {
return;
}
struct evhttp_request* ev_req = http_req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);
char buf[4096];
while (evbuffer_get_length(evbuf) > 0) {
auto n = evbuffer_remove(evbuf, buf, sizeof(buf));
while (n > 0) {
auto res = write(ctx->fd, buf, n);
if (res < 0) {
char errbuf[64];
LOG(WARNING) << "write file failed, path=" << ctx->file_path << ", errno=" << errno
<< ", errmsg=" << strerror_r(errno, errbuf, sizeof(errbuf));
HttpChannel::send_reply(http_req, HttpStatus::INTERNAL_SERVER_ERROR,
"write file failed");
delete ctx;
http_req->set_handler_ctx(nullptr);
return;
}
n -= res;
ctx->bytes_written += res;
}
}
}
void MiniLoadAction::_on_new_chunk_data(HttpRequest* http_req) {
StreamLoadContext* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->stream_load_ctx;
if (ctx == nullptr || !ctx->status.ok()) {
return;
}
struct evhttp_request* ev_req = http_req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);
while (evbuffer_get_length(evbuf) > 0) {
auto bb = ByteBuffer::allocate(4096);
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
auto st = ctx->body_sink->append(bb);
if (!st.ok()) {
LOG(WARNING) << "append body content failed. errmsg=" << st.get_error_msg()
<< ctx->brief();
ctx->status = st;
return;
}
ctx->receive_bytes += remove_bytes;
}
}
void MiniLoadAction::free_handler_ctx(void* param) {
MiniLoadCtx* ctx = (MiniLoadCtx*)param;
if (ctx->is_streaming) {
StreamLoadContext* streaming_ctx = ((MiniLoadCtx*)param)->stream_load_ctx;
if (streaming_ctx != nullptr) {
// sender is going, make receiver know it
if (streaming_ctx->body_sink != nullptr) {
LOG(WARNING) << "cancel stream load " << streaming_ctx->id.to_string()
<< " because sender failed";
streaming_ctx->body_sink->cancel("sender failed");
}
if (streaming_ctx->unref()) {
delete streaming_ctx;
}
}
} else {
MiniLoadAsyncCtx* async_ctx = ((MiniLoadCtx*)param)->mini_load_async_ctx;
delete async_ctx;
}
delete ctx;
}
void MiniLoadAction::handle(HttpRequest* http_req) {
MiniLoadCtx* ctx = (MiniLoadCtx*)http_req->handler_ctx();
if (ctx->is_streaming) {
_new_handle(http_req);
} else {
_handle(http_req);
}
}
void MiniLoadAction::_handle(HttpRequest* http_req) {
MiniLoadAsyncCtx* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->mini_load_async_ctx;
if (ctx == nullptr) {
// when ctx is nullptr, there must be error happened when on_chunk_data
// and reply is sent, we just return with no operation
LOG(WARNING) << "handler context is nullptr when MiniLoad callback execute, uri="
<< http_req->uri();
return;
}
if (ctx->body_bytes > 0 && ctx->bytes_written != ctx->body_bytes) {
LOG(WARNING) << "bytes written is not equal with body size, uri=" << http_req->uri()
<< ", body_bytes=" << ctx->body_bytes
<< ", bytes_written=" << ctx->bytes_written;
HttpChannel::send_reply(http_req, HttpStatus::INTERNAL_SERVER_ERROR,
"receipt size not equal with body size");
return;
}
auto st = _load(http_req, ctx->file_path, ctx->load_check_req.user, ctx->load_check_req.cluster,
ctx->bytes_written);
std::string str = st.to_json();
HttpChannel::send_reply(http_req, str);
}
Status MiniLoadAction::generate_check_load_req(const HttpRequest* http_req,
TLoadCheckRequest* check_load_req) {
const char k_basic[] = "Basic ";
const std::string& auth = http_req->header(HttpHeaders::AUTHORIZATION);
if (auth.compare(0, sizeof(k_basic) - 1, k_basic, sizeof(k_basic) - 1) != 0) {
return Status::InternalError("Not support Basic authorization.");
}
check_load_req->protocolVersion = FrontendServiceVersion::V1;
// Skip "Basic "
std::string str = auth.substr(sizeof(k_basic) - 1);
std::string cluster;
if (!parse_auth(str, &(check_load_req->user), &(check_load_req->passwd), &cluster)) {
LOG(WARNING) << "parse auth string failed." << auth << " and str " << str;
return Status::InternalError("Parse authorization failed.");
}
if (!cluster.empty()) {
check_load_req->__set_cluster(cluster);
}
check_load_req->db = http_req->param(DB_KEY);
check_load_req->__set_tbl(http_req->param(TABLE_KEY));
if (http_req->param(SUB_LABEL_KEY).empty()) {
check_load_req->__set_label(http_req->param(LABEL_KEY));
check_load_req->__set_timestamp(GetCurrentTimeMicros());
}
if (http_req->remote_host() != nullptr) {
std::string user_ip(http_req->remote_host());
check_load_req->__set_user_ip(user_ip);
}
return Status::OK();
}
bool LoadHandleCmp::operator()(const LoadHandle& lhs, const LoadHandle& rhs) const {
int ret = lhs.label.compare(rhs.label);
if (ret < 0) {
return true;
} else if (ret > 0) {
return false;
}
ret = lhs.sub_label.compare(rhs.sub_label);
if (ret < 0) {
return true;
} else if (ret > 0) {
return false;
}
ret = lhs.db.compare(rhs.db);
if (ret < 0) {
return true;
}
return false;
}
// fe will begin the txn and record the metadata of load
Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) {
// prepare begin mini load request params
TMiniLoadBeginRequest request;
set_request_auth(&request, ctx->auth);
request.db = ctx->db;
request.tbl = ctx->table;
request.label = ctx->label;
if (!ctx->sub_label.empty()) {
request.__set_sub_label(ctx->sub_label);
}
if (ctx->timeout_second != -1) {
request.__set_timeout_second(ctx->timeout_second);
}
if (ctx->max_filter_ratio != 0.0) {
request.__set_max_filter_ratio(ctx->max_filter_ratio);
}
request.__set_create_timestamp(UnixMillis());
request.__set_request_id(ctx->id.to_thrift());
// begin load by master
const TNetworkAddress& master_addr = _exec_env->master_info()->network_address;
TMiniLoadBeginResult res;
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &res](FrontendServiceConnection& client) {
client->miniLoadBegin(res, request);
}));
Status begin_status(res.status);
if (!begin_status.ok()) {
LOG(INFO) << "failed to begin mini load " << ctx->label
<< " with error msg:" << begin_status.get_error_msg();
return begin_status;
}
ctx->txn_id = res.txn_id;
// txn has been begun in fe
ctx->need_rollback = true;
LOG(INFO) << "load:" << ctx->label << " txn:" << res.txn_id << " has been begun in fe";
return Status::OK();
}
Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) {
// prepare request parameters
TStreamLoadPutRequest put_request;
set_request_auth(&put_request, ctx->auth);
put_request.db = ctx->db;
put_request.tbl = ctx->table;
put_request.txnId = ctx->txn_id;
put_request.formatType = ctx->format;
put_request.__set_loadId(ctx->id.to_thrift());
put_request.fileType = TFileType::FILE_STREAM;
std::map<std::string, std::string> params(req->query_params().begin(),
req->query_params().end());
/* merge params of columns and hll
* for example:
* input: columns=c1,tmp_c2,tmp_c3\&hll=hll_c2,tmp_c2:hll_c3,tmp_c3
* output: columns=c1,tmp_c2,tmp_c3,hll_c2=hll_hash(tmp_c2),hll_c3=hll_hash(tmp_c3)
*/
auto columns_it = params.find(COLUMNS_KEY);
if (columns_it != params.end()) {
std::string columns_value = columns_it->second;
auto hll_it = params.find(HLL_KEY);
if (hll_it != params.end()) {
std::string hll_value = hll_it->second;
if (hll_value.empty()) {
return Status::InvalidArgument(
"Hll value could not be empty when hll key is exists!");
}
std::map<std::string, std::string> hll_map;
RETURN_IF_ERROR(StringParser::split_string_to_map(hll_value, ":", ",", &hll_map));
if (hll_map.empty()) {
return Status::InvalidArgument("Hll value could not transform to hll expr: " +
hll_value);
}
for (auto& hll_element : hll_map) {
columns_value += "," + hll_element.first + "=hll_hash(" + hll_element.second + ")";
}
}
put_request.__set_columns(columns_value);
}
auto column_separator_it = params.find(COLUMN_SEPARATOR_KEY);
if (column_separator_it != params.end()) {
put_request.__set_columnSeparator(column_separator_it->second);
}
if (ctx->timeout_second != -1) {
put_request.__set_timeout(ctx->timeout_second);
}
auto strict_mode_it = params.find(STRICT_MODE_KEY);
if (strict_mode_it != params.end()) {
std::string strict_mode_value = strict_mode_it->second;
if (iequal(strict_mode_value, "false")) {
put_request.__set_strictMode(false);
} else if (iequal(strict_mode_value, "true")) {
put_request.__set_strictMode(true);
} else {
return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
}
}
// plan this load
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&put_request, ctx](FrontendServiceConnection& client) {
client->streamLoadPut(ctx->put_result, put_request);
}));
Status plan_status(ctx->put_result.status);
if (!plan_status.ok()) {
LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status.get_error_msg()
<< ctx->brief();
return plan_status;
}
VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params);
return Status::OK();
}
// new on_header of mini load
Status MiniLoadAction::_on_new_header(HttpRequest* req) {
size_t body_bytes = 0;
size_t max_body_bytes = config::streaming_load_max_mb * 1024 * 1024;
if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH));
if (body_bytes > max_body_bytes) {
std::stringstream ss;
ss << "file size exceed max body size, max_body_bytes=" << max_body_bytes;
return Status::InvalidArgument(ss.str());
}
} else {
evhttp_connection_set_max_body_size(
evhttp_request_get_connection(req->get_evhttp_request()), max_body_bytes);
}
RETURN_IF_ERROR(check_request(req));
StreamLoadContext* ctx = new StreamLoadContext(_exec_env);
ctx->ref();
((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx = ctx;
// auth information
if (!parse_basic_auth(*req, &ctx->auth)) {
LOG(WARNING) << "parse basic authorization failed." << ctx->brief();
return Status::InvalidArgument("no valid Basic authorization");
}
ctx->load_type = TLoadType::MINI_LOAD;
ctx->load_src_type = TLoadSourceType::RAW;
ctx->db = req->param(DB_KEY);
ctx->table = req->param(TABLE_KEY);
ctx->label = req->param(LABEL_KEY);
if (!req->param(SUB_LABEL_KEY).empty()) {
ctx->sub_label = req->param(SUB_LABEL_KEY);
}
ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
std::map<std::string, std::string> params(req->query_params().begin(),
req->query_params().end());
auto max_filter_ratio_it = params.find(MAX_FILTER_RATIO_KEY);
if (max_filter_ratio_it != params.end()) {
ctx->max_filter_ratio = strtod(max_filter_ratio_it->second.c_str(), nullptr);
}
auto timeout_it = params.find(TIMEOUT_KEY);
if (timeout_it != params.end()) {
try {
ctx->timeout_second = std::stoi(timeout_it->second);
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid timeout format");
}
}
LOG(INFO) << "new income mini load request." << ctx->brief() << ", db: " << ctx->db
<< ", tbl: " << ctx->table;
// record metadata in frontend
RETURN_IF_ERROR(_begin_mini_load(ctx));
// open sink
auto pipe = std::make_shared<StreamLoadPipe>();
RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe));
ctx->body_sink = pipe;
// get plan from fe
RETURN_IF_ERROR(_process_put(req, ctx));
// execute plan
return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
}
void MiniLoadAction::_new_handle(HttpRequest* req) {
StreamLoadContext* ctx = ((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx;
DCHECK(ctx != nullptr);
if (ctx->status.ok()) {
ctx->status = _on_new_handle(ctx);
if (!ctx->status.ok()) {
LOG(WARNING) << "handle mini load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status.get_error_msg();
}
}
// if failed to commit and status is not PUBLISH_TIMEOUT, rollback the txn.
// PUBLISH_TIMEOUT is treated as OK in mini load, because user will use show load stmt
// to see the final result.
if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) {
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx);
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
ctx->body_sink->cancel(ctx->status.get_error_msg());
}
}
std::string str = ctx->to_json_for_mini_load();
str += '\n';
HttpChannel::send_reply(req, str);
}
Status MiniLoadAction::_on_new_handle(StreamLoadContext* ctx) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "receive body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::InternalError("receive body don't equal with body bytes");
}
// wait stream load sink finish
RETURN_IF_ERROR(ctx->body_sink->finish());
// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());
// commit this load with mini load attachment
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx));
return Status::OK();
}
} // namespace doris

View File

@ -1,109 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <map>
#include <mutex>
#include <set>
#include <string>
#include "common/status.h"
#include "gen_cpp/FrontendService.h"
#include "http/http_handler.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/defer_op.h"
namespace doris {
// Used to identify one mini load job
struct LoadHandle {
std::string db;
std::string label;
std::string sub_label;
};
struct LoadHandleCmp {
bool operator()(const LoadHandle& lhs, const LoadHandle& rhs) const;
};
class TMasterResult;
class ExecEnv;
class StreamLoadContext;
// This a handler for mini load
// path is /api/{db}/{table}/_load
class MiniLoadAction : public HttpHandler {
public:
MiniLoadAction(ExecEnv* exec_env);
virtual ~MiniLoadAction() {}
void handle(HttpRequest* req) override;
bool request_will_be_read_progressively() override { return true; }
int on_header(HttpRequest* req) override;
void on_chunk_data(HttpRequest* req) override;
void free_handler_ctx(void* ctx) override;
void erase_handle(const LoadHandle& handle);
private:
Status _load(HttpRequest* req, const std::string& file_path, const std::string& user,
const std::string& cluster, int64_t file_size);
Status data_saved_dir(const LoadHandle& desc, const std::string& table, std::string* file_path);
Status _on_header(HttpRequest* http_req);
Status generate_check_load_req(const HttpRequest* http_req, TLoadCheckRequest* load_check_req);
Status check_auth(const HttpRequest* http_req, const TLoadCheckRequest& load_check_req);
void _on_chunk_data(HttpRequest* http_req);
void _handle(HttpRequest* http_req);
// streaming mini load
Status _on_new_header(HttpRequest* req);
Status _begin_mini_load(StreamLoadContext* ctx);
Status _process_put(HttpRequest* req, StreamLoadContext* ctx);
void _on_new_chunk_data(HttpRequest* http_req);
void _new_handle(HttpRequest* req);
Status _on_new_handle(StreamLoadContext* ctx);
bool _is_streaming(HttpRequest* req);
Status _merge_header(HttpRequest* http_req, std::map<std::string, std::string>* params);
const std::string _streaming_function_name = "STREAMING_MINI_LOAD";
ExecEnv* _exec_env;
std::mutex _lock;
// Used to check if load is duplicated in this instance.
std::set<LoadHandle, LoadHandleCmp> _current_load;
};
} // namespace doris

View File

@ -60,7 +60,6 @@ set(RUNTIME_FILES
qsorter.cpp
fragment_mgr.cpp
dpp_sink_internal.cpp
etl_job_mgr.cpp
load_path_mgr.cpp
types.cpp
tmp_file_mgr.cc

View File

@ -1,302 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/etl_job_mgr.h"
#include <filesystem>
#include <functional>
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/HeartbeatService_types.h"
#include "gen_cpp/MasterService_types.h"
#include "gen_cpp/Status_types.h"
#include "gen_cpp/Types_types.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/plan_fragment_executor.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "util/file_utils.h"
#include "util/uid_util.h"
namespace doris {
#define VLOG_ETL VLOG_CRITICAL
std::string EtlJobMgr::to_http_path(const std::string& file_name) {
std::stringstream url;
url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
<< "/api/_download_load?"
<< "token=" << _exec_env->token() << "&file=" << file_name;
return url.str();
}
std::string EtlJobMgr::to_load_error_http_path(const std::string& file_name) {
std::stringstream url;
url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
<< "/api/_load_error_log?"
<< "file=" << file_name;
return url.str();
}
const std::string DPP_NORMAL_ALL = "dpp.norm.ALL";
const std::string DPP_ABNORMAL_ALL = "dpp.abnorm.ALL";
const std::string ERROR_FILE_PREFIX = "error_log";
EtlJobMgr::EtlJobMgr(ExecEnv* exec_env)
: _exec_env(exec_env), _success_jobs(5000), _failed_jobs(5000) {}
EtlJobMgr::~EtlJobMgr() {}
Status EtlJobMgr::init() {
return Status::OK();
}
Status EtlJobMgr::start_job(const TMiniLoadEtlTaskRequest& req) {
const TUniqueId& id = req.params.params.fragment_instance_id;
std::lock_guard<std::mutex> l(_lock);
auto it = _running_jobs.find(id);
if (it != _running_jobs.end()) {
// Already have this job, return what???
LOG(INFO) << "Duplicated etl job(" << id << ")";
return Status::OK();
}
// If already success, we return Status::OK()
// and wait master ask me success information
if (_success_jobs.exists(id)) {
// Already success
LOG(INFO) << "Already successful etl job(" << id << ")";
return Status::OK();
}
RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
req.params, std::bind<void>(&EtlJobMgr::finalize_job, this, std::placeholders::_1)));
// redo this job if failed before
if (_failed_jobs.exists(id)) {
_failed_jobs.erase(id);
}
VLOG_ETL << "Job id(" << id << ") insert to EtlJobMgr.";
_running_jobs.insert(id);
return Status::OK();
}
void EtlJobMgr::report_to_master(PlanFragmentExecutor* executor) {
TUpdateMiniEtlTaskStatusRequest request;
RuntimeState* state = executor->runtime_state();
request.protocolVersion = FrontendServiceVersion::V1;
request.etlTaskId = state->fragment_instance_id();
Status status = get_job_state(state->fragment_instance_id(), &request.etlTaskStatus);
if (!status.ok()) {
return;
}
const TNetworkAddress& master_address = _exec_env->master_info()->network_address;
FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address,
config::thrift_rpc_timeout_ms, &status);
if (!status.ok()) {
std::stringstream ss;
ss << "Connect master failed, with address(" << master_address.hostname << ":"
<< master_address.port << ")";
LOG(WARNING) << ss.str();
return;
}
TFeResult res;
try {
try {
client->updateMiniEtlTaskStatus(res, request);
} catch (apache::thrift::transport::TTransportException& e) {
LOG(WARNING) << "Retrying report etl jobs status to master(" << master_address.hostname
<< ":" << master_address.port << ") because: " << e.what();
status = client.reopen(config::thrift_rpc_timeout_ms);
if (!status.ok()) {
LOG(WARNING) << "Client repoen failed. with address(" << master_address.hostname
<< ":" << master_address.port << ")";
return;
}
client->updateMiniEtlTaskStatus(res, request);
}
} catch (apache::thrift::TException& e) {
// failed when retry.
// reopen to disable this connection
client.reopen(config::thrift_rpc_timeout_ms);
std::stringstream ss;
ss << "Report etl task to master(" << master_address.hostname << ":" << master_address.port
<< ") failed because: " << e.what();
LOG(WARNING) << ss.str();
}
// TODO(lingbin): check status of 'res' here.
// because there are some checks in updateMiniEtlTaskStatus, for example max_filter_ratio.
LOG(INFO) << "Successfully report elt job status to master.id=" << print_id(request.etlTaskId);
}
void EtlJobMgr::finalize_job(PlanFragmentExecutor* executor) {
EtlJobResult result;
RuntimeState* state = executor->runtime_state();
if (executor->status().ok()) {
// Get files
for (auto& it : state->output_files()) {
int64_t file_size = std::filesystem::file_size(it);
result.file_map[to_http_path(it)] = file_size;
}
// set statistics
result.process_normal_rows = state->num_rows_load_success();
result.process_abnormal_rows = state->num_rows_load_filtered();
} else {
// get debug path
result.process_normal_rows = state->num_rows_load_success();
result.process_abnormal_rows = state->num_rows_load_filtered();
}
result.debug_path = state->get_error_log_file_path();
finish_job(state->fragment_instance_id(), executor->status(), result);
// Try to report this finished task to master
report_to_master(executor);
}
Status EtlJobMgr::cancel_job(const TUniqueId& id) {
std::lock_guard<std::mutex> l(_lock);
auto it = _running_jobs.find(id);
if (it == _running_jobs.end()) {
// Nothing to do
LOG(INFO) << "No such job id, just print to info " << id;
return Status::OK();
}
_running_jobs.erase(it);
VLOG_ETL << "id(" << id << ") have been removed from EtlJobMgr.";
EtlJobCtx job_ctx;
job_ctx.finish_status = Status::Cancelled("Cancelled");
_failed_jobs.put(id, job_ctx);
return Status::OK();
}
Status EtlJobMgr::finish_job(const TUniqueId& id, const Status& finish_status,
const EtlJobResult& result) {
std::lock_guard<std::mutex> l(_lock);
auto it = _running_jobs.find(id);
if (it == _running_jobs.end()) {
std::stringstream ss;
ss << "Unknown job id(" << id << ").";
return Status::InternalError(ss.str());
}
_running_jobs.erase(it);
EtlJobCtx ctx;
ctx.finish_status = finish_status;
ctx.result = result;
if (finish_status.ok()) {
_success_jobs.put(id, ctx);
} else {
_failed_jobs.put(id, ctx);
}
VLOG_ETL << "Move job(" << id << ") from running to "
<< (finish_status.ok() ? "success jobs" : "failed jobs");
return Status::OK();
}
Status EtlJobMgr::get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* result) {
std::lock_guard<std::mutex> l(_lock);
auto it = _running_jobs.find(id);
if (it != _running_jobs.end()) {
result->status.__set_status_code(TStatusCode::OK);
result->__set_etl_state(TEtlState::RUNNING);
return Status::OK();
}
// Successful
if (_success_jobs.exists(id)) {
EtlJobCtx ctx;
_success_jobs.get(id, &ctx);
result->status.__set_status_code(TStatusCode::OK);
result->__set_etl_state(TEtlState::FINISHED);
result->__set_file_map(ctx.result.file_map);
// set counter
std::map<std::string, std::string> counter;
counter[DPP_NORMAL_ALL] = std::to_string(ctx.result.process_normal_rows);
counter[DPP_ABNORMAL_ALL] = std::to_string(ctx.result.process_abnormal_rows);
result->__set_counters(counter);
if (!ctx.result.debug_path.empty()) {
result->__set_tracking_url(to_load_error_http_path(ctx.result.debug_path));
}
return Status::OK();
}
// failed information
if (_failed_jobs.exists(id)) {
EtlJobCtx ctx;
_failed_jobs.get(id, &ctx);
result->status.__set_status_code(TStatusCode::OK);
result->__set_etl_state(TEtlState::CANCELLED);
if (!ctx.result.debug_path.empty()) {
result->__set_tracking_url(to_http_path(ctx.result.debug_path));
}
return Status::OK();
}
// NO this jobs
result->status.__set_status_code(TStatusCode::OK);
result->__set_etl_state(TEtlState::CANCELLED);
return Status::OK();
}
Status EtlJobMgr::erase_job(const TDeleteEtlFilesRequest& req) {
std::lock_guard<std::mutex> l(_lock);
const TUniqueId& id = req.mini_load_id;
auto it = _running_jobs.find(id);
if (it != _running_jobs.end()) {
std::stringstream ss;
ss << "Job(" << id << ") is running, can not be deleted.";
return Status::InternalError(ss.str());
}
_success_jobs.erase(id);
_failed_jobs.erase(id);
return Status::OK();
}
void EtlJobMgr::debug(std::stringstream& ss) {
// Make things easy
std::lock_guard<std::mutex> l(_lock);
// Debug summary
ss << "we have " << _running_jobs.size() << " jobs Running\n";
ss << "we have " << _failed_jobs.size() << " jobs Failed\n";
ss << "we have " << _success_jobs.size() << " jobs Successful\n";
// Debug running jobs
for (auto& it : _running_jobs) {
ss << "running jobs: " << it << "\n";
}
// Debug success jobs
for (auto& it : _success_jobs) {
ss << "successful jobs: " << it.first << "\n";
}
// Debug failed jobs
for (auto& it : _failed_jobs) {
ss << "failed jobs: " << it.first << "\n";
}
}
} // namespace doris

View File

@ -1,99 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <pthread.h>
#include <mutex>
#include <string>
#include <unordered_set>
#include <vector>
#include "common/status.h"
#include "gen_cpp/Types_types.h"
#include "http/rest_monitor_iface.h"
#include "util/hash_util.hpp"
#include "util/lru_cache.hpp"
namespace doris {
// used to report to master
struct EtlJobResult {
EtlJobResult() : process_normal_rows(0), process_abnormal_rows(0) {}
std::string debug_path;
std::map<std::string, int64_t> file_map;
int64_t process_normal_rows;
int64_t process_abnormal_rows;
};
// used to report to master
struct EtlJobCtx {
Status finish_status;
EtlJobResult result;
};
class TMiniLoadEtlStatusResult;
class TMiniLoadEtlTaskRequest;
class ExecEnv;
class PlanFragmentExecutor;
class TDeleteEtlFilesRequest;
// manager of all the Etl job
// used this because master may loop be to check if a load job is finished.
class EtlJobMgr : public RestMonitorIface {
public:
EtlJobMgr(ExecEnv* exec_env);
virtual ~EtlJobMgr();
// make trash directory for collect
Status init();
// Make a job to running state
// If this job is successful, return OK
// If this job is failed, move this job from _failed_jobs to _running_jobs
// Otherwise, put it to _running_jobs
Status start_job(const TMiniLoadEtlTaskRequest& req);
// Make a running job to failed job
Status cancel_job(const TUniqueId& id);
Status finish_job(const TUniqueId& id, const Status& finish_status, const EtlJobResult& result);
Status get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* result);
Status erase_job(const TDeleteEtlFilesRequest& req);
void finalize_job(PlanFragmentExecutor* executor);
virtual void debug(std::stringstream& ss);
private:
std::string to_http_path(const std::string& file_path);
std::string to_load_error_http_path(const std::string& file_path);
void report_to_master(PlanFragmentExecutor* executor);
ExecEnv* _exec_env;
std::mutex _lock;
std::unordered_set<TUniqueId> _running_jobs;
LruCache<TUniqueId, EtlJobCtx> _success_jobs;
LruCache<TUniqueId, EtlJobCtx> _failed_jobs;
};
} // namespace doris

View File

@ -128,7 +128,6 @@ public:
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
ResultCache* result_cache() { return _result_cache; }
TMasterInfo* master_info() { return _master_info; }
EtlJobMgr* etl_job_mgr() { return _etl_job_mgr; }
LoadPathMgr* load_path_mgr() { return _load_path_mgr; }
DiskIoMgr* disk_io_mgr() { return _disk_io_mgr; }
TmpFileMgr* tmp_file_mgr() { return _tmp_file_mgr; }
@ -207,7 +206,6 @@ private:
FragmentMgr* _fragment_mgr = nullptr;
ResultCache* _result_cache = nullptr;
TMasterInfo* _master_info = nullptr;
EtlJobMgr* _etl_job_mgr = nullptr;
LoadPathMgr* _load_path_mgr = nullptr;
DiskIoMgr* _disk_io_mgr = nullptr;
TmpFileMgr* _tmp_file_mgr = nullptr;

View File

@ -31,7 +31,6 @@
#include "runtime/client_cache.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/disk_io_mgr.h"
#include "runtime/etl_job_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/external_scan_context_mgr.h"
#include "runtime/fold_constant_executor.h"
@ -128,7 +127,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_result_cache = new ResultCache(config::query_cache_max_size_mb,
config::query_cache_elasticity_size_mb);
_master_info = new TMasterInfo();
_etl_job_mgr = new EtlJobMgr(this);
_load_path_mgr = new LoadPathMgr(this);
_disk_io_mgr = new DiskIoMgr();
_tmp_file_mgr = new TmpFileMgr(this);
@ -147,7 +145,6 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_broker_client_cache->init_metrics("broker");
_result_mgr->init();
_cgroups_mgr->init_cgroups();
_etl_job_mgr->init();
Status status = _load_path_mgr->init();
if (!status.ok()) {
LOG(ERROR) << "load path mgr init failed." << status.get_error_msg();
@ -335,7 +332,6 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_tmp_file_mgr);
SAFE_DELETE(_disk_io_mgr);
SAFE_DELETE(_load_path_mgr);
SAFE_DELETE(_etl_job_mgr);
SAFE_DELETE(_master_info);
SAFE_DELETE(_fragment_mgr);
SAFE_DELETE(_cgroups_mgr);

View File

@ -335,17 +335,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt
}
switch (ctx->load_type) {
case TLoadType::MINI_LOAD: {
attach->loadType = TLoadType::MINI_LOAD;
TMiniLoadTxnCommitAttachment ml_attach;
ml_attach.loadedRows = ctx->number_loaded_rows;
ml_attach.filteredRows = ctx->number_filtered_rows;
if (!ctx->error_url.empty()) {
ml_attach.__set_errorLogUrl(ctx->error_url);
}
attach->mlTxnCommitAttachment = std::move(ml_attach);
attach->__isset.mlTxnCommitAttachment = true;
LOG(FATAL) << "mini load is not supported any more";
break;
}
case TLoadType::ROUTINE_LOAD: {

View File

@ -36,10 +36,6 @@ class ThriftServer;
class TAgentResult;
class TAgentTaskRequest;
class TAgentPublishRequest;
class TMiniLoadEtlTaskRequest;
class TMiniLoadEtlStatusResult;
class TMiniLoadEtlStatusRequest;
class TDeleteEtlFilesRequest;
class TPlanExecRequest;
class TPlanExecParams;
class TExecPlanFragmentParams;
@ -97,23 +93,6 @@ public:
_agent_server->publish_cluster_state(result, request);
}
virtual void submit_etl_task(TAgentResult& result,
const TMiniLoadEtlTaskRequest& request) override {
VLOG_RPC << "submit_etl_task. request is "
<< apache::thrift::ThriftDebugString(request).c_str();
_agent_server->submit_etl_task(result, request);
}
virtual void get_etl_status(TMiniLoadEtlStatusResult& result,
const TMiniLoadEtlStatusRequest& request) override {
_agent_server->get_etl_status(result, request);
}
virtual void delete_etl_files(TAgentResult& result,
const TDeleteEtlFilesRequest& request) override {
_agent_server->delete_etl_files(result, request);
}
// DorisServer service
virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val,
const TExecPlanFragmentParams& params) override;

View File

@ -25,7 +25,6 @@
#include "http/action/health_action.h"
#include "http/action/meta_action.h"
#include "http/action/metrics_action.h"
#include "http/action/mini_load.h"
#include "http/action/pprof_actions.h"
#include "http/action/reload_tablet_action.h"
#include "http/action/reset_rpc_channel_action.h"
@ -57,9 +56,9 @@ Status HttpService::start() {
add_default_path_handlers(_web_page_handler.get(), MemTracker::get_process_tracker());
// register load
MiniLoadAction* miniload_action = _pool.add(new MiniLoadAction(_env));
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load", miniload_action);
StreamLoadAction* streamload_action = _pool.add(new StreamLoadAction(_env));
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load",
streamload_action);
_ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load",
streamload_action);
StreamLoad2PCAction* streamload_2pc_action = _pool.add(new StreamLoad2PCAction(_env));

View File

@ -215,7 +215,6 @@ set(RUNTIME_TEST_FILES
# runtime/dpp_sink_internal_test.cpp
# runtime/dpp_sink_test.cpp
# runtime/data_spliter_test.cpp
# runtime/etl_job_mgr_test.cpp
# runtime/tmp_file_mgr_test.cpp
# runtime/disk_io_mgr_test.cpp
# runtime/thread_resource_mgr_test.cpp

View File

@ -94,15 +94,6 @@ public:
virtual void publish_cluster_state(TAgentResult& return_val,
const TAgentPublishRequest& request) {}
virtual void submit_etl_task(TAgentResult& return_val, const TMiniLoadEtlTaskRequest& request) {
}
virtual void get_etl_status(TMiniLoadEtlStatusResult& return_val,
const TMiniLoadEtlStatusRequest& request) {}
virtual void delete_etl_files(TAgentResult& return_val, const TDeleteEtlFilesRequest& request) {
}
virtual void register_pull_load_task(TStatus& _return, const TUniqueId& id,
const int32_t num_senders) {}

View File

@ -1,221 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/etl_job_mgr.h"
#include <gtest/gtest.h>
#include "gen_cpp/Types_types.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "util/cpu_info.h"
namespace doris {
// Mock fragment mgr
Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) {
return Status::OK();
}
FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _thread_pool(10, 128) {}
FragmentMgr::~FragmentMgr() {}
void FragmentMgr::debug(std::stringstream& ss) {}
class EtlJobMgrTest : public testing::Test {
public:
EtlJobMgrTest() {}
private:
ExecEnv _exec_env;
};
TEST_F(EtlJobMgrTest, NormalCase) {
EtlJobMgr mgr(&_exec_env);
TUniqueId id;
id.lo = 1;
id.hi = 1;
TMiniLoadEtlStatusResult res;
TMiniLoadEtlTaskRequest req;
TDeleteEtlFilesRequest del_req;
del_req.mini_load_id = id;
req.params.params.fragment_instance_id = id;
// make it running
EXPECT_TRUE(mgr.start_job(req).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
// make it finishing
EtlJobResult job_result;
job_result.file_map["abc"] = 100L;
EXPECT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::FINISHED, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
EXPECT_EQ(1, res.file_map.size());
EXPECT_EQ(100, res.file_map["abc"]);
// erase it
EXPECT_TRUE(mgr.erase_job(del_req).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
}
TEST_F(EtlJobMgrTest, DuplicateCase) {
EtlJobMgr mgr(&_exec_env);
TUniqueId id;
id.lo = 1;
id.hi = 1;
TMiniLoadEtlStatusResult res;
TMiniLoadEtlTaskRequest req;
req.params.params.fragment_instance_id = id;
// make it running
EXPECT_TRUE(mgr.start_job(req).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
// Put it twice
EXPECT_TRUE(mgr.start_job(req).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
}
TEST_F(EtlJobMgrTest, RunAfterSuccess) {
EtlJobMgr mgr(&_exec_env);
TUniqueId id;
id.lo = 1;
id.hi = 1;
TMiniLoadEtlStatusResult res;
TMiniLoadEtlTaskRequest req;
TDeleteEtlFilesRequest del_req;
del_req.mini_load_id = id;
req.params.params.fragment_instance_id = id;
// make it running
EXPECT_TRUE(mgr.start_job(req).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
// make it finishing
EtlJobResult job_result;
job_result.file_map["abc"] = 100L;
EXPECT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::FINISHED, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
EXPECT_EQ(1, res.file_map.size());
EXPECT_EQ(100, res.file_map["abc"]);
// Put it twice
EXPECT_TRUE(mgr.start_job(req).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::FINISHED, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
EXPECT_EQ(1, res.file_map.size());
EXPECT_EQ(100, res.file_map["abc"]);
}
TEST_F(EtlJobMgrTest, RunAfterFail) {
EtlJobMgr mgr(&_exec_env);
TUniqueId id;
id.lo = 1;
id.hi = 1;
TMiniLoadEtlStatusResult res;
TMiniLoadEtlTaskRequest req;
req.params.params.fragment_instance_id = id;
// make it running
EXPECT_TRUE(mgr.start_job(req).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
// make it finishing
EtlJobResult job_result;
job_result.debug_path = "abc";
EXPECT_TRUE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc error"), job_result).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
// Put it twice
EXPECT_TRUE(mgr.start_job(req).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
}
TEST_F(EtlJobMgrTest, CancelJob) {
EtlJobMgr mgr(&_exec_env);
TUniqueId id;
id.lo = 1;
id.hi = 1;
TMiniLoadEtlStatusResult res;
TMiniLoadEtlTaskRequest req;
req.params.params.fragment_instance_id = id;
// make it running
EXPECT_TRUE(mgr.start_job(req).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
// make it finishing
EtlJobResult job_result;
job_result.debug_path = "abc";
EXPECT_TRUE(mgr.cancel_job(id).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
// Put it twice
EXPECT_TRUE(mgr.start_job(req).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::RUNNING, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
}
TEST_F(EtlJobMgrTest, FinishUnknownJob) {
EtlJobMgr mgr(&_exec_env);
TUniqueId id;
id.lo = 1;
id.hi = 1;
TMiniLoadEtlStatusResult res;
// make it finishing
EtlJobResult job_result;
job_result.debug_path = "abc";
EXPECT_FALSE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc error"), job_result).ok());
EXPECT_TRUE(mgr.get_job_state(id, &res).ok());
EXPECT_EQ(TEtlState::CANCELLED, res.etl_state);
EXPECT_EQ(TStatusCode::OK, res.status.status_code);
}
} // namespace doris

View File

@ -29,11 +29,9 @@ import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.FunctionParams;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.IsNullPredicate;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StorageBackend;
@ -45,7 +43,6 @@ import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.FunctionSet;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
@ -88,14 +85,12 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.task.AgentClient;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.task.PushTask;
import org.apache.doris.thrift.TBrokerScanRangeParams;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.transaction.TransactionNotFoundException;
@ -230,140 +225,6 @@ public class Load {
lock.writeLock().unlock();
}
// return true if we truly add the load job
// return false otherwise (eg: a retry request)
@Deprecated
public boolean addMiniLoadJob(TMiniLoadRequest request) throws DdlException {
// get params
String fullDbName = request.getDb();
String tableName = request.getTbl();
String label = request.getLabel();
long timestamp = 0;
if (request.isSetTimestamp()) {
timestamp = request.getTimestamp();
}
TNetworkAddress beAddr = request.getBackend();
String filePathsValue = request.getFiles().get(0);
Map<String, String> params = request.getProperties();
// create load stmt
// label name
LabelName labelName = new LabelName(fullDbName, label);
// data descriptions
// file paths
if (Strings.isNullOrEmpty(filePathsValue)) {
throw new DdlException("File paths are not specified");
}
List<String> filePaths = Arrays.asList(filePathsValue.split(","));
// partitions | column names | separator | line delimiter
List<String> partitionNames = null;
List<String> columnNames = null;
Separator columnSeparator = null;
List<String> hllColumnPairList = null;
Separator lineDelimiter = null;
String formatType = null;
if (params != null) {
String specifiedPartitions = params.get(LoadStmt.KEY_IN_PARAM_PARTITIONS);
if (!Strings.isNullOrEmpty(specifiedPartitions)) {
partitionNames = Arrays.asList(specifiedPartitions.split(","));
}
String specifiedColumns = params.get(LoadStmt.KEY_IN_PARAM_COLUMNS);
if (!Strings.isNullOrEmpty(specifiedColumns)) {
columnNames = Arrays.asList(specifiedColumns.split(","));
}
final String hll = params.get(LoadStmt.KEY_IN_PARAM_HLL);
if (!Strings.isNullOrEmpty(hll)) {
hllColumnPairList = Arrays.asList(hll.split(":"));
}
String columnSeparatorStr = params.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR);
if (columnSeparatorStr != null) {
if (columnSeparatorStr.isEmpty()) {
columnSeparatorStr = "\t";
}
columnSeparator = new Separator(columnSeparatorStr);
try {
columnSeparator.analyze();
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
}
String lineDelimiterStr = params.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER);
if (lineDelimiterStr != null) {
if (lineDelimiterStr.isEmpty()) {
lineDelimiterStr = "\n";
}
lineDelimiter = new Separator(lineDelimiterStr);
try {
lineDelimiter.analyze();
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
}
formatType = params.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE);
}
DataDescription dataDescription = new DataDescription(
tableName,
partitionNames != null ? new PartitionNames(false, partitionNames) : null,
filePaths,
columnNames,
columnSeparator,
formatType,
false,
null
);
dataDescription.setLineDelimiter(lineDelimiter);
dataDescription.setBeAddr(beAddr);
// parse hll param pair
if (hllColumnPairList != null) {
for (int i = 0; i < hllColumnPairList.size(); i++) {
final String pairStr = hllColumnPairList.get(i);
final List<String> pairList = Arrays.asList(pairStr.split(","));
if (pairList.size() != 2) {
throw new DdlException("hll param format error");
}
final String resultColumn = pairList.get(0);
final String hashColumn = pairList.get(1);
final Pair<String, List<String>> pair = new Pair<String, List<String>>(FunctionSet.HLL_HASH,
Arrays.asList(hashColumn));
dataDescription.addColumnMapping(resultColumn, pair);
}
}
List<DataDescription> dataDescriptions = Lists.newArrayList(dataDescription);
// job properties
Map<String, String> properties = Maps.newHashMap();
if (params != null) {
String maxFilterRatio = params.get(LoadStmt.MAX_FILTER_RATIO_PROPERTY);
if (!Strings.isNullOrEmpty(maxFilterRatio)) {
properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, maxFilterRatio);
}
String timeout = params.get(LoadStmt.TIMEOUT_PROPERTY);
if (!Strings.isNullOrEmpty(timeout)) {
properties.put(LoadStmt.TIMEOUT_PROPERTY, timeout);
}
}
LoadStmt stmt = new LoadStmt(labelName, dataDescriptions, null, null, properties);
// try to register mini label
if (!registerMiniLabel(fullDbName, label, timestamp)) {
return false;
}
try {
addLoadJob(stmt, EtlJobType.MINI, timestamp);
return true;
} finally {
deregisterMiniLabel(fullDbName, label);
}
}
public void addLoadJob(LoadStmt stmt, EtlJobType etlJobType, long timestamp) throws DdlException {
// get db
String dbName = stmt.getLabel().getDbName();
@ -2641,24 +2502,6 @@ public class Load {
}
break;
case MINI:
for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
long backendId = taskInfo.getBackendId();
Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendId);
if (backend == null) {
LOG.warn("backend does not exist. id: {}", backendId);
break;
}
long dbId = job.getDbId();
Database db = Catalog.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
LOG.warn("db does not exist. id: {}", dbId);
break;
}
AgentClient client = new AgentClient(backend.getHost(), backend.getBePort());
client.deleteEtlFiles(dbId, job.getId(), db.getFullName(), job.getLabel());
}
break;
case INSERT:
break;

View File

@ -22,14 +22,12 @@ import org.apache.doris.analysis.CompoundPredicate.Operator;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DataQualityException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.PatternMatcher;
@ -41,9 +39,6 @@ import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.load.FailMsg.CancelType;
import org.apache.doris.load.Load;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TMiniLoadBeginRequest;
import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionState;
@ -137,53 +132,6 @@ public class LoadManager implements Writable {
.filter(j -> (j.getState() != JobState.FINISHED && j.getState() != JobState.CANCELLED)).count();
}
/**
* This method will be invoked by streaming mini load.
* It will begin the txn of mini load immediately without any scheduler .
*
*/
public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws UserException {
String cluster = SystemInfoService.DEFAULT_CLUSTER;
if (request.isSetCluster()) {
cluster = request.getCluster();
}
Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb()));
Table table = database.getTableOrDdlException(request.tbl);
MiniLoadJob loadJob = null;
writeLock();
try {
loadJob = new MiniLoadJob(database.getId(), table.getId(), request);
// call unprotectedExecute before adding load job. so that if job is not started ok, no need to add.
// NOTICE(cmy): this order is only for Mini Load, because mini load's
// unprotectedExecute() only do beginTxn().
// for other kind of load job, execute the job after adding job.
// Mini load job must be executed before release write lock.
// Otherwise, the duplicated request maybe get the transaction id before transaction of mini load is begun.
loadJob.beginTxn();
loadJob.unprotectedExecute();
createLoadJob(loadJob);
} catch (DuplicatedRequestException e) {
// this is a duplicate request, just return previous txn id
LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(),
e.getTxnId());
return e.getTxnId();
} catch (UserException e) {
if (loadJob != null) {
loadJob.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), false,
false /* no need to write edit log, because createLoadJob log is not wrote yet */);
}
throw e;
} finally {
writeUnlock();
}
// The persistence of mini load must be the final step of create mini load.
// After mini load was executed, the txn id has been set and state has been changed to loading.
// Those two need to be record in persistence.
Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob);
return loadJob.getTransactionId();
}
/**
* This method will be invoked by version1 of broker or hadoop load.
* It is used to check the label of v1 and v2 at the same time.
@ -209,31 +157,6 @@ public class LoadManager implements Writable {
}
}
/**
* This method will be invoked by non-streaming of mini load.
* It is used to check the label of v1 and v2 at the same time.
* Finally, the non-streaming mini load will belongs to load class.
*
* @param request request
* @return if: mini load is a duplicated load, return false. else: return true.
* @deprecated not support mini load
*/
@Deprecated
public boolean createLoadJobV1FromRequest(TMiniLoadRequest request) throws DdlException {
String cluster = SystemInfoService.DEFAULT_CLUSTER;
if (request.isSetCluster()) {
cluster = request.getCluster();
}
Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb()));
writeLock();
try {
checkLabelUsed(database.getId(), request.getLabel());
return Catalog.getCurrentCatalog().getLoadInstance().addMiniLoadJob(request);
} finally {
writeUnlock();
}
}
/**
* MultiLoadMgr use.
**/

View File

@ -20,21 +20,11 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.io.Text;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TMiniLoadBeginRequest;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -55,22 +45,6 @@ public class MiniLoadJob extends LoadJob {
super(EtlJobType.MINI);
}
public MiniLoadJob(long dbId, long tableId, TMiniLoadBeginRequest request) throws MetaNotFoundException {
super(EtlJobType.MINI, dbId, request.getLabel());
this.tableId = tableId;
this.tableName = request.getTbl();
if (request.isSetTimeoutSecond()) {
setTimeout(request.getTimeoutSecond());
}
if (request.isSetMaxFilterRatio()) {
setMaxFilterRatio(request.getMaxFilterRatio());
}
this.createTimestamp = request.getCreateTimestamp();
this.loadStartTimestamp = createTimestamp;
this.authorizationInfo = gatherAuthInfo();
this.requestId = request.getRequestId();
}
@Override
public Set<String> getTableNamesForShow() {
return Sets.newHashSet(tableName);
@ -87,15 +61,7 @@ public class MiniLoadJob extends LoadJob {
}
@Override
public void beginTxn()
throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException,
QuotaExceedException, MetaNotFoundException {
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, Lists.newArrayList(tableId), label, requestId,
new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
getTimeout());
}
public void beginTxn() {}
@Override
protected void replayTxnAttachment(TransactionState txnState) {

View File

@ -18,7 +18,6 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.common.io.Text;
import org.apache.doris.thrift.TMiniLoadTxnCommitAttachment;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TxnCommitAttachment;
@ -36,15 +35,6 @@ public class MiniLoadTxnCommitAttachment extends TxnCommitAttachment {
super(TransactionState.LoadJobSourceType.BACKEND_STREAMING);
}
public MiniLoadTxnCommitAttachment(TMiniLoadTxnCommitAttachment tMiniLoadTxnCommitAttachment) {
super(TransactionState.LoadJobSourceType.BACKEND_STREAMING);
this.loadedRows = tMiniLoadTxnCommitAttachment.getLoadedRows();
this.filteredRows = tMiniLoadTxnCommitAttachment.getFilteredRows();
if (tMiniLoadTxnCommitAttachment.isSetErrorLogUrl()) {
this.errorLogUrl = tMiniLoadTxnCommitAttachment.getErrorLogUrl();
}
}
public long getLoadedRows() {
return loadedRows;
}

View File

@ -43,15 +43,12 @@ import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.collect.Streams;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.awaitility.Awaitility;
@ -64,7 +61,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
// Class used to record state of multi-load operation
public class MultiLoadMgr {
@ -107,18 +103,6 @@ public class MultiLoadMgr {
Catalog.getCurrentCatalog().getLoadManager().createLoadJobV1FromMultiStart(fullDbName, label);
}
public void load(TMiniLoadRequest request) throws DdlException {
if (CollectionUtils.isNotEmpty(request.getFileSize())
&& request.getFileSize().size() != request.getFiles().size()) {
throw new DdlException("files count and file size count not match: [" + request.getFileSize().size()
+ "!=" + request.getFiles().size() + "]");
}
List<Pair<String, Long>> files = Streams.zip(request.getFiles().stream(),
request.getFileSize().stream(), Pair::create).collect(Collectors.toList());
load(request.getDb(), request.getLabel(), request.getSubLabel(), request.getTbl(),
files, request.getBackend(), request.getProperties(), request.getTimestamp());
}
// Add one load job
private void load(String fullDbName, String label,
String subLabel, String table,

View File

@ -43,16 +43,10 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.datasource.DataSourceIf;
import org.apache.doris.datasource.InternalDataSource;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.MiniEtlTaskInfo;
import org.apache.doris.master.MasterImpl;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.StreamLoadPlanner;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditEvent.AuditEventBuilder;
import org.apache.doris.plugin.AuditEvent.EventType;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.QeProcessorImpl;
@ -77,10 +71,8 @@ import org.apache.doris.thrift.TGetDbsParams;
import org.apache.doris.thrift.TGetDbsResult;
import org.apache.doris.thrift.TGetTablesParams;
import org.apache.doris.thrift.TGetTablesResult;
import org.apache.doris.thrift.TIsMethodSupportedRequest;
import org.apache.doris.thrift.TListPrivilegesResult;
import org.apache.doris.thrift.TListTableStatusResult;
import org.apache.doris.thrift.TLoadCheckRequest;
import org.apache.doris.thrift.TLoadTxn2PCRequest;
import org.apache.doris.thrift.TLoadTxn2PCResult;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
@ -92,10 +84,6 @@ import org.apache.doris.thrift.TLoadTxnRollbackResult;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TMasterResult;
import org.apache.doris.thrift.TMiniLoadBeginRequest;
import org.apache.doris.thrift.TMiniLoadBeginResult;
import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
import org.apache.doris.thrift.TMiniLoadRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPrivilegeStatus;
import org.apache.doris.thrift.TReportExecStatusParams;
@ -109,9 +97,7 @@ import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TStreamLoadPutResult;
import org.apache.doris.thrift.TTableStatus;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
import org.apache.doris.thrift.TUpdateMiniEtlTaskStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.DatabaseTransactionMgr;
@ -121,7 +107,6 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TxnCommitAttachment;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@ -130,8 +115,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -479,197 +462,6 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return masterImpl.fetchResource();
}
@Deprecated
@Override
public TFeResult miniLoad(TMiniLoadRequest request) throws TException {
LOG.debug("receive mini load request: label: {}, db: {}, tbl: {}, backend: {}",
request.getLabel(), request.getDb(), request.getTbl(), request.getBackend());
ConnectContext context = new ConnectContext(null);
String cluster = SystemInfoService.DEFAULT_CLUSTER;
if (request.isSetCluster()) {
cluster = request.cluster;
}
final String fullDbName = ClusterNamespace.getFullName(cluster, request.db);
request.setDb(fullDbName);
context.setCluster(cluster);
context.setDatabase(ClusterNamespace.getFullName(cluster, request.db));
context.setQualifiedUser(ClusterNamespace.getFullName(cluster, request.user));
context.setCatalog(Catalog.getCurrentCatalog());
context.getState().reset();
context.setThreadLocalInfo();
TStatus status = new TStatus(TStatusCode.OK);
TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
try {
if (request.isSetSubLabel()) {
ExecuteEnv.getInstance().getMultiLoadMgr().load(request);
} else {
// try to add load job, label will be checked here.
if (Catalog.getCurrentCatalog().getLoadManager().createLoadJobV1FromRequest(request)) {
try {
// generate mini load audit log
logMiniLoadStmt(request);
} catch (Exception e) {
LOG.warn("failed log mini load stmt", e);
}
}
}
} catch (UserException e) {
LOG.warn("add mini load error: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("unexpected exception when adding mini load", e);
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
} finally {
ConnectContext.remove();
}
LOG.debug("mini load result: {}", result);
return result;
}
private void logMiniLoadStmt(TMiniLoadRequest request) throws UnknownHostException {
String stmt = getMiniLoadStmt(request);
AuditEvent auditEvent = new AuditEventBuilder().setEventType(EventType.AFTER_QUERY)
.setClientIp(request.user_ip + ":0")
.setUser(request.user)
.setDb(request.db)
.setState(TStatusCode.OK.name())
.setQueryTime(0)
.setStmt(stmt).build();
Catalog.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
}
private String getMiniLoadStmt(TMiniLoadRequest request) throws UnknownHostException {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("curl --location-trusted -u user:passwd -T ");
if (request.files.size() == 1) {
stringBuilder.append(request.files.get(0));
} else if (request.files.size() > 1) {
stringBuilder.append("\"{").append(Joiner.on(",").join(request.files)).append("}\"");
}
InetAddress masterAddress = FrontendOptions.getLocalHost();
stringBuilder.append(" http://").append(masterAddress.getHostAddress()).append(":");
stringBuilder.append(Config.http_port).append("/api/").append(request.db).append("/");
stringBuilder.append(request.tbl).append("/_load?label=").append(request.label);
if (!request.properties.isEmpty()) {
stringBuilder.append("&");
List<String> props = Lists.newArrayList();
for (Map.Entry<String, String> entry : request.properties.entrySet()) {
String prop = entry.getKey() + "=" + entry.getValue();
props.add(prop);
}
stringBuilder.append(Joiner.on("&").join(props));
}
return stringBuilder.toString();
}
@Override
public TFeResult updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request) throws TException {
TFeResult result = new TFeResult();
result.setProtocolVersion(FrontendServiceVersion.V1);
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
// get job task info
TUniqueId etlTaskId = request.getEtlTaskId();
long jobId = etlTaskId.getHi();
long taskId = etlTaskId.getLo();
LoadJob job = Catalog.getCurrentCatalog().getLoadInstance().getLoadJob(jobId);
if (job == null) {
String failMsg = "job does not exist. id: " + jobId;
LOG.warn(failMsg);
status.setStatusCode(TStatusCode.CANCELLED);
status.addToErrorMsgs(failMsg);
return result;
}
MiniEtlTaskInfo taskInfo = job.getMiniEtlTask(taskId);
if (taskInfo == null) {
String failMsg = "task info does not exist. task id: " + taskId + ", job id: " + jobId;
LOG.warn(failMsg);
status.setStatusCode(TStatusCode.CANCELLED);
status.addToErrorMsgs(failMsg);
return result;
}
// update etl task status
TMiniLoadEtlStatusResult statusResult = request.getEtlTaskStatus();
LOG.debug("load job id: {}, etl task id: {}, status: {}", jobId, taskId, statusResult);
EtlStatus taskStatus = taskInfo.getTaskStatus();
if (taskStatus.setState(statusResult.getEtlState())) {
if (statusResult.isSetCounters()) {
taskStatus.setCounters(statusResult.getCounters());
}
if (statusResult.isSetTrackingUrl()) {
taskStatus.setTrackingUrl(statusResult.getTrackingUrl());
}
if (statusResult.isSetFileMap()) {
taskStatus.setFileMap(statusResult.getFileMap());
}
}
return result;
}
@Override
public TMiniLoadBeginResult miniLoadBegin(TMiniLoadBeginRequest request) throws TException {
LOG.debug("receive mini load begin request. label: {}, user: {}, ip: {}",
request.getLabel(), request.getUser(), request.getUserIp());
TMiniLoadBeginResult result = new TMiniLoadBeginResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
try {
String cluster = SystemInfoService.DEFAULT_CLUSTER;
if (request.isSetCluster()) {
cluster = request.cluster;
}
// step1: check password and privs
checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
// step2: check label and record metadata in load manager
if (request.isSetSubLabel()) {
// TODO(ml): multi mini load
} else {
// add load metadata in loadManager
result.setTxnId(Catalog.getCurrentCatalog().getLoadManager().createLoadJobFromMiniLoad(request));
}
return result;
} catch (UserException e) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
return result;
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
}
@Override
public TFeResult isMethodSupported(TIsMethodSupportedRequest request) throws TException {
TStatus status = new TStatus(TStatusCode.OK);
TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
switch (request.getFunctionName()) {
case "STREAMING_MINI_LOAD":
break;
default:
status.setStatusCode(TStatusCode.NOT_IMPLEMENTED_ERROR);
break;
}
return result;
}
@Override
public TMasterOpResult forward(TMasterOpRequest params) throws TException {
TNetworkAddress clientAddr = getClientAddr();
@ -723,35 +515,6 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
}
@Override
public TFeResult loadCheck(TLoadCheckRequest request) throws TException {
LOG.debug("receive load check request. label: {}, user: {}, ip: {}",
request.getLabel(), request.getUser(), request.getUserIp());
TStatus status = new TStatus(TStatusCode.OK);
TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
try {
String cluster = SystemInfoService.DEFAULT_CLUSTER;
if (request.isSetCluster()) {
cluster = request.cluster;
}
checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
} catch (UserException e) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
return result;
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
@Override
public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException {
String clientAddr = getClientAddrAsString();

View File

@ -21,14 +21,9 @@ import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Status;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TAgentResult;
import org.apache.doris.thrift.TAgentServiceVersion;
import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TDeleteEtlFilesRequest;
import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TSnapshotRequest;
import org.apache.doris.thrift.TStatus;
@ -52,22 +47,6 @@ public class AgentClient {
this.port = port;
}
public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) {
TAgentResult result = null;
LOG.debug("submit etl task. request: {}", request);
try {
borrowClient();
// submit etl task
result = client.submitEtlTask(request);
ok = true;
} catch (Exception e) {
LOG.warn("submit etl task error", e);
} finally {
returnClient();
}
return result;
}
public TAgentResult makeSnapshot(TSnapshotRequest request) {
TAgentResult result = null;
LOG.debug("submit make snapshot task. request: {}", request);
@ -116,24 +95,6 @@ public class AgentClient {
return result;
}
public TMiniLoadEtlStatusResult getEtlStatus(long jobId, long taskId) {
TMiniLoadEtlStatusResult result = null;
TMiniLoadEtlStatusRequest request = new TMiniLoadEtlStatusRequest(TAgentServiceVersion.V1,
new TUniqueId(jobId, taskId));
LOG.debug("get mini load etl task status. request: {}", request);
try {
borrowClient();
// get etl status
result = client.getEtlStatus(request);
ok = true;
} catch (Exception e) {
LOG.warn("get etl status error", e);
} finally {
returnClient();
}
return result;
}
public TExportStatusResult getExportStatus(long jobId, long taskId) {
TExportStatusResult result = null;
TUniqueId request = new TUniqueId(jobId, taskId);
@ -183,22 +144,6 @@ public class AgentClient {
return result;
}
public void deleteEtlFiles(long dbId, long jobId, String dbName, String label) {
TDeleteEtlFilesRequest request = new TDeleteEtlFilesRequest(TAgentServiceVersion.V1,
new TUniqueId(dbId, jobId), dbName, label);
LOG.debug("delete etl files. request: {}", request);
try {
borrowClient();
// delete etl files
client.deleteEtlFiles(request);
ok = true;
} catch (Exception e) {
LOG.warn("delete etl files error", e);
} finally {
returnClient();
}
}
private void borrowClient() throws Exception {
// create agent client
ok = false;

View File

@ -47,8 +47,6 @@ public abstract class TxnCommitAttachment implements Writable {
switch (txnCommitAttachment.getLoadType()) {
case ROUTINE_LOAD:
return new RLTaskTxnCommitAttachment(txnCommitAttachment.getRlTaskTxnCommitAttachment());
case MINI_LOAD:
return new MiniLoadTxnCommitAttachment(txnCommitAttachment.getMlTxnCommitAttachment());
default:
return null;
}

View File

@ -25,7 +25,6 @@ import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TCancelPlanFragmentParams;
import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TDeleteEtlFilesRequest;
import org.apache.doris.thrift.TDiskTrashInfo;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentResult;
@ -33,9 +32,6 @@ import org.apache.doris.thrift.TExportStatusResult;
import org.apache.doris.thrift.TExportTaskRequest;
import org.apache.doris.thrift.TFetchDataParams;
import org.apache.doris.thrift.TFetchDataResult;
import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultBatch;
import org.apache.doris.thrift.TRoutineLoadTask;
@ -156,21 +152,6 @@ public class GenericPoolTest {
return null;
}
@Override
public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) throws TException {
return null;
}
@Override
public TMiniLoadEtlStatusResult getEtlStatus(TMiniLoadEtlStatusRequest request) throws TException {
return null;
}
@Override
public TAgentResult deleteEtlFiles(TDeleteEtlFilesRequest request) throws TException {
return null;
}
@Override
public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) throws TException {
// TODO Auto-generated method stub

View File

@ -34,9 +34,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentParams;
import org.apache.doris.thrift.TCancelPlanFragmentResult;
import org.apache.doris.thrift.TCheckStorageFormatResult;
import org.apache.doris.thrift.TCloneReq;
import org.apache.doris.thrift.TDeleteEtlFilesRequest;
import org.apache.doris.thrift.TDiskTrashInfo;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TExecPlanFragmentResult;
import org.apache.doris.thrift.TExportState;
@ -47,9 +45,6 @@ import org.apache.doris.thrift.TFetchDataResult;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.THeartbeatResult;
import org.apache.doris.thrift.TMasterInfo;
import org.apache.doris.thrift.TMiniLoadEtlStatusRequest;
import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
import org.apache.doris.thrift.TRoutineLoadTask;
import org.apache.doris.thrift.TScanBatchResult;
import org.apache.doris.thrift.TScanCloseParams;
@ -243,21 +238,6 @@ public class MockedBackendFactory {
return new TAgentResult(new TStatus(TStatusCode.OK));
}
@Override
public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) throws TException {
return new TAgentResult(new TStatus(TStatusCode.OK));
}
@Override
public TMiniLoadEtlStatusResult getEtlStatus(TMiniLoadEtlStatusRequest request) throws TException {
return new TMiniLoadEtlStatusResult(new TStatus(TStatusCode.OK), TEtlState.FINISHED);
}
@Override
public TAgentResult deleteEtlFiles(TDeleteEtlFilesRequest request) throws TException {
return new TAgentResult(new TStatus(TStatusCode.OK));
}
@Override
public TStatus submitExportTask(TExportTaskRequest request) throws TException {
return new TStatus(TStatusCode.OK);

View File

@ -409,29 +409,3 @@ struct TAgentPublishRequest {
2: required list<TTopicUpdate> updates
}
struct TMiniLoadEtlTaskRequest {
1: required TAgentServiceVersion protocol_version
2: required PaloInternalService.TExecPlanFragmentParams params
}
struct TMiniLoadEtlStatusRequest {
1: required TAgentServiceVersion protocol_version
2: required Types.TUniqueId mini_load_id
}
struct TMiniLoadEtlStatusResult {
1: required Status.TStatus status
2: required Types.TEtlState etl_state
3: optional map<string, i64> file_map
4: optional map<string, string> counters
5: optional string tracking_url
// progress
}
struct TDeleteEtlFilesRequest {
1: required TAgentServiceVersion protocol_version
2: required Types.TUniqueId mini_load_id
3: required string db_name
4: required string label
}

View File

@ -146,13 +146,6 @@ service BackendService {
AgentService.TAgentResult publish_cluster_state(1:AgentService.TAgentPublishRequest request);
AgentService.TAgentResult submit_etl_task(1:AgentService.TMiniLoadEtlTaskRequest request);
AgentService.TMiniLoadEtlStatusResult get_etl_status(
1:AgentService.TMiniLoadEtlStatusRequest request);
AgentService.TAgentResult delete_etl_files(1:AgentService.TDeleteEtlFilesRequest request);
Status.TStatus submit_export_task(1:TExportTaskRequest request);
PaloInternalService.TExportStatusResult get_export_status(1:Types.TUniqueId task_id);

View File

@ -411,31 +411,6 @@ struct TFeResult {
2: required Status.TStatus status
}
// Submit one table load job
// if subLabel is set, this job belong to a multi-load transaction
struct TMiniLoadRequest {
1: required FrontendServiceVersion protocolVersion
2: required string db
3: required string tbl
4: required string label
5: optional string user
6: required Types.TNetworkAddress backend
7: required list<string> files
8: required map<string, string> properties
9: optional string subLabel
10: optional string cluster
11: optional i64 timestamp
12: optional string user_ip
13: optional bool is_retry
14: optional list<i64> file_size
}
struct TUpdateMiniEtlTaskStatusRequest {
1: required FrontendServiceVersion protocolVersion
2: required Types.TUniqueId etlTaskId
3: required AgentService.TMiniLoadEtlStatusResult etlTaskStatus
}
struct TMasterOpRequest {
1: required string user
2: required string db
@ -483,44 +458,6 @@ struct TMasterOpResult {
4: optional Types.TUniqueId queryId;
}
struct TLoadCheckRequest {
1: required FrontendServiceVersion protocolVersion
2: required string user
3: required string passwd
4: required string db
5: optional string label
6: optional string cluster
7: optional i64 timestamp
8: optional string user_ip
9: optional string tbl
}
struct TMiniLoadBeginRequest {
1: required string user
2: required string passwd
3: optional string cluster
4: optional string user_ip
5: required string db
6: required string tbl
7: required string label
8: optional string sub_label
9: optional i64 timeout_second
10: optional double max_filter_ratio
11: optional i64 auth_code
12: optional i64 create_timestamp
13: optional Types.TUniqueId request_id
14: optional string auth_code_uuid
}
struct TIsMethodSupportedRequest {
1: optional string function_name
}
struct TMiniLoadBeginResult {
1: required Status.TStatus status
2: optional i64 txn_id
}
struct TUpdateExportTaskStatusRequest {
1: required FrontendServiceVersion protocolVersion
2: required Types.TUniqueId taskId
@ -627,16 +564,10 @@ struct TRLTaskTxnCommitAttachment {
11: optional string errorLogUrl
}
struct TMiniLoadTxnCommitAttachment {
1: required i64 loadedRows
2: required i64 filteredRows
3: optional string errorLogUrl
}
struct TTxnCommitAttachment {
1: required Types.TLoadType loadType
2: optional TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment
3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment
// 3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment
}
struct TLoadTxnCommitRequest {
@ -751,14 +682,6 @@ service FrontendService {
MasterService.TMasterResult finishTask(1: MasterService.TFinishTaskRequest request)
MasterService.TMasterResult report(1: MasterService.TReportRequest request)
MasterService.TFetchResourceResult fetchResource()
// those three method are used for asynchronous mini load which will be abandoned
TFeResult miniLoad(1: TMiniLoadRequest request)
TFeResult updateMiniEtlTaskStatus(1: TUpdateMiniEtlTaskStatusRequest request)
TFeResult loadCheck(1: TLoadCheckRequest request)
// this method is used for streaming mini load
TMiniLoadBeginResult miniLoadBegin(1: TMiniLoadBeginRequest request)
TFeResult isMethodSupported(1: TIsMethodSupportedRequest request)
TMasterOpResult forward(1: TMasterOpRequest params)

View File

@ -1,157 +0,0 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
"""
import os
import subprocess
class DorisMiniLoadClient(object):
""" load file to doris """
def __init__(self, db_host, db_port, db_name,
db_user, db_password, file_name, table, load_timeout):
"""
init
:param db_host: db host
:param db_port: db port
:param db_name: db name
:param db_user: db user
:param db_password: db password
:param file_name: local file path
:param table: db table
:param load_timeout:mini load timeout, defalut 86400 seconds.
"""
self.file_name = file_name
self.table = table
self.load_host = db_host
self.load_port = db_port
self.load_database = db_name
self.load_user = db_user
self.load_password = db_password
self.load_timeout = load_timeout
def get_label(self):
"""
获取label前缀
:return: label
"""
return '_'.join([self.table, os.path.basename(self.file_name)])
def load_doris(self):
"""
load file to doris by curl, allow 3 times to retry.
:return: mini load label
"""
retry_time = 0
label = self.get_label()
while retry_time < 3:
load_cmd = "curl"
param_location = "--location-trusted"
param_user = "%s:%s" % (self.load_user, self.load_password)
param_file = "%s" % self.file_name
param_url = "http://%s:%s/api/%s/%s/_load?label=%s&timeout=" % (self.load_host, self.load_port,
self.load_database,
self.table, label, self.load_timeout)
load_subprocess = subprocess.Popen([load_cmd, param_location,
"-u", param_user, "-T", param_file, param_url])
# Wait for child process to terminate. Returns returncode attribute
load_subprocess.wait()
# check returncode;
# If fail, retry 3 times
if load_subprocess.returncode != 0:
print """Load to doris failed! LABEL is %s, Retry time is %d """ % (label, retry_time)
retry_time += 1
# If success, print log, and break retry loop
if load_subprocess.returncode == 0:
print """Load to doris success! LABEL is %s, Retry time is %d """ % (label, retry_time)
break
return label
@classmethod
def check_load_status(cls, label, host, port, user, password, database):
"""
check async mini load process status.
:param label:mini load label
:param host: db host
:param port: db port
:param user: db user
:param password: db password
:param database: db database
:return: check async mini load process status.
"""
db_conn = MySQLdb.connect(host=host,port=port,user=user,passwd=password,db=database)
db_cursor = db_conn.cursor()
check_status_sql = "show load where label = '%s' order by CreateTime desc limit 1" % label
db_cursor.execute(check_status_sql)
rows = db_cursor.fetchall()
# timeout config: 60 minutes.
timeout = 60 * 60
while timeout > 0:
if len(rows) == 0:
print """Load label: %s doesn't exist""" % label
return
load_status = rows[0][2]
print "mini load status: " + load_status
if load_status == 'FINISHED':
print """Async mini load to db success! label is %s""" % label
break
if load_status == 'CANCELLED':
print """Async load to db failed! label is %s""" % label
break
timeout = timeout - 5
time.sleep(5)
db_cursor.execute(sql)
rows = db_cursor.fetchall()
if time_out <= 0:
print """Async load to db timeout! timeout second is: %s, label is %s""" % (time_out, label)
if __name__ == '__main__':
"""
mini_load demo.
There is no need to install subprocess in Python 2.7. It is a standard module that is built in.
You need input db config & load param.
"""
db_host = "db_conn_host"
db_port = "port"
db_name = "db_name"
db_user = "db_user"
db_password = "db_password"
file_name = "file_name"
table = "db_table"
# default load_time_out, seconds
load_timeout = 86400
doris_client = DorisMiniLoadClient(
db_host, db_port, db_name, db_user, db_password, file_name, table, load_timeout)
doris_client.check_load_status(doris_client.load_doirs(), db_host, db_port, db_user, db_password, db_name)
print "load to doris end"