From aab7dc956f6f47a79d70bafd1ffc76b676a8e2cd Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Thu, 30 Jun 2022 23:21:41 +0800 Subject: [PATCH] [refactor](load) Remove mini load (#10520) --- be/src/agent/agent_server.cpp | 42 - be/src/agent/agent_server.h | 6 - be/src/http/CMakeLists.txt | 1 - be/src/http/action/mini_load.cpp | 966 ------------------ be/src/http/action/mini_load.h | 109 -- be/src/runtime/CMakeLists.txt | 1 - be/src/runtime/etl_job_mgr.cpp | 302 ------ be/src/runtime/etl_job_mgr.h | 99 -- be/src/runtime/exec_env.h | 2 - be/src/runtime/exec_env_init.cpp | 4 - .../stream_load/stream_load_executor.cpp | 12 +- be/src/service/backend_service.h | 21 - be/src/service/http_service.cpp | 5 +- be/test/CMakeLists.txt | 1 - be/test/runtime/data_stream_test.cpp | 9 - be/test/runtime/etl_job_mgr_test.cpp | 221 ---- .../main/java/org/apache/doris/load/Load.java | 157 --- .../apache/doris/load/loadv2/LoadManager.java | 77 -- .../apache/doris/load/loadv2/MiniLoadJob.java | 36 +- .../loadv2/MiniLoadTxnCommitAttachment.java | 10 - .../org/apache/doris/qe/MultiLoadMgr.java | 16 - .../doris/service/FrontendServiceImpl.java | 237 ----- .../org/apache/doris/task/AgentClient.java | 55 - .../transaction/TxnCommitAttachment.java | 2 - .../apache/doris/common/GenericPoolTest.java | 19 - .../doris/utframe/MockedBackendFactory.java | 20 - gensrc/thrift/AgentService.thrift | 26 - gensrc/thrift/BackendService.thrift | 7 - gensrc/thrift/FrontendService.thrift | 79 +- samples/mini_load/python/mini_load_utils.py | 157 --- 30 files changed, 5 insertions(+), 2694 deletions(-) delete mode 100644 be/src/http/action/mini_load.cpp delete mode 100644 be/src/http/action/mini_load.h delete mode 100644 be/src/runtime/etl_job_mgr.cpp delete mode 100644 be/src/runtime/etl_job_mgr.h delete mode 100644 be/test/runtime/etl_job_mgr_test.cpp delete mode 100644 samples/mini_load/python/mini_load_utils.py diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index c907aafd23..58ce4e00b5 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -29,7 +29,6 @@ #include "common/status.h" #include "gutil/strings/substitute.h" #include "olap/snapshot_manager.h" -#include "runtime/etl_job_mgr.h" using std::string; using std::vector; @@ -247,45 +246,4 @@ void AgentServer::publish_cluster_state(TAgentResult& t_agent_result, status.to_thrift(&t_agent_result.status); } -void AgentServer::submit_etl_task(TAgentResult& t_agent_result, - const TMiniLoadEtlTaskRequest& request) { - Status status = _exec_env->etl_job_mgr()->start_job(request); - auto fragment_instance_id = request.params.params.fragment_instance_id; - if (status.ok()) { - VLOG_RPC << "success to submit etl task. id=" << fragment_instance_id; - } else { - VLOG_RPC << "fail to submit etl task. id=" << fragment_instance_id - << ", err_msg=" << status.get_error_msg(); - } - status.to_thrift(&t_agent_result.status); -} - -void AgentServer::get_etl_status(TMiniLoadEtlStatusResult& t_agent_result, - const TMiniLoadEtlStatusRequest& request) { - Status status = _exec_env->etl_job_mgr()->get_job_state(request.mini_load_id, &t_agent_result); - if (!status.ok()) { - LOG(WARNING) << "fail to get job state. [id=" << request.mini_load_id << "]"; - } - - VLOG_RPC << "success to get job state. [id=" << request.mini_load_id - << ", status=" << t_agent_result.status.status_code - << ", etl_state=" << t_agent_result.etl_state << ", files="; - for (auto& item : t_agent_result.file_map) { - VLOG_RPC << item.first << ":" << item.second << ";"; - } - VLOG_RPC << "]"; -} - -void AgentServer::delete_etl_files(TAgentResult& t_agent_result, - const TDeleteEtlFilesRequest& request) { - Status status = _exec_env->etl_job_mgr()->erase_job(request); - if (!status.ok()) { - LOG(WARNING) << "fail to delete etl files. because " << status.get_error_msg() - << " with request " << request; - } - - VLOG_RPC << "success to delete etl files. request=" << request; - status.to_thrift(&t_agent_result.status); -} - } // namespace doris diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index 3998a6b258..2897fab586 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -47,12 +47,6 @@ public: // [[deprecated]] void publish_cluster_state(TAgentResult& agent_result, const TAgentPublishRequest& request); - // Multi-Load will still use the following 3 methods for now. - void submit_etl_task(TAgentResult& agent_result, const TMiniLoadEtlTaskRequest& request); - void get_etl_status(TMiniLoadEtlStatusResult& agent_result, - const TMiniLoadEtlStatusRequest& request); - void delete_etl_files(TAgentResult& result, const TDeleteEtlFilesRequest& request); - private: DISALLOW_COPY_AND_ASSIGN(AgentServer); diff --git a/be/src/http/CMakeLists.txt b/be/src/http/CMakeLists.txt index 8ca052da65..b956d0982b 100644 --- a/be/src/http/CMakeLists.txt +++ b/be/src/http/CMakeLists.txt @@ -34,7 +34,6 @@ add_library(Webserver STATIC ev_http_server.cpp http_client.cpp action/download_action.cpp - action/mini_load.cpp action/monitor_action.cpp action/health_action.cpp action/tablet_migration_action.cpp diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp deleted file mode 100644 index 53c1464471..0000000000 --- a/be/src/http/action/mini_load.cpp +++ /dev/null @@ -1,966 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "http/action/mini_load.h" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include "agent/cgroups_mgr.h" -#include "common/status.h" -#include "gen_cpp/FrontendService.h" -#include "gen_cpp/FrontendService_types.h" -#include "gen_cpp/HeartbeatService_types.h" -#include "gen_cpp/MasterService_types.h" -#include "http/http_channel.h" -#include "http/http_headers.h" -#include "http/http_parser.h" -#include "http/http_request.h" -#include "http/http_response.h" -#include "http/http_status.h" -#include "http/utils.h" -#include "olap/file_helper.h" -#include "runtime/client_cache.h" -#include "runtime/exec_env.h" -#include "runtime/fragment_mgr.h" -#include "runtime/load_path_mgr.h" -#include "runtime/stream_load/stream_load_context.h" -#include "service/backend_options.h" -#include "util/file_utils.h" -#include "util/json_util.h" -#include "util/string_parser.hpp" -#include "util/string_util.h" -#include "util/thrift_rpc_helper.h" -#include "util/time.h" -#include "util/url_coding.h" - -namespace doris { - -// context used to handle mini-load in asynchronous mode -struct MiniLoadAsyncCtx { - MiniLoadAsyncCtx(MiniLoadAction* handler_) : handler(handler_) {} - ~MiniLoadAsyncCtx() { - if (need_remove_handle) { - handler->erase_handle(load_handle); - } - if (fd >= 0) { - ::close(fd); - } - } - - MiniLoadAction* handler; - - // used to check duplicate - LoadHandle load_handle; - bool need_remove_handle = false; - - // file to save - std::string file_path; - int fd = -1; - - size_t body_bytes = 0; - size_t bytes_written = 0; - - TLoadCheckRequest load_check_req; -}; - -struct MiniLoadCtx { - MiniLoadCtx(bool is_streaming_) : is_streaming(is_streaming_) {} - - bool is_streaming = false; - MiniLoadAsyncCtx* mini_load_async_ctx = nullptr; - StreamLoadContext* stream_load_ctx = nullptr; -}; - -const std::string CLUSTER_KEY = "cluster"; -const std::string DB_KEY = "db"; -const std::string TABLE_KEY = "table"; -const std::string LABEL_KEY = "label"; -const std::string SUB_LABEL_KEY = "sub_label"; -const std::string FILE_PATH_KEY = "file_path"; -const std::string COLUMNS_KEY = "columns"; -const std::string HLL_KEY = "hll"; -const std::string COLUMN_SEPARATOR_KEY = "column_separator"; -const std::string MAX_FILTER_RATIO_KEY = "max_filter_ratio"; -const std::string STRICT_MODE_KEY = "strict_mode"; -const std::string TIMEOUT_KEY = "timeout"; -const char* k_100_continue = "100-continue"; - -MiniLoadAction::MiniLoadAction(ExecEnv* exec_env) : _exec_env(exec_env) {} - -static bool is_name_valid(const std::string& name) { - return !name.empty(); -} - -static Status check_request(HttpRequest* req) { - auto& params = *req->params(); - - // check params - if (!is_name_valid(params[DB_KEY])) { - return Status::InternalError("Database name is not valid."); - } - if (!is_name_valid(params[TABLE_KEY])) { - return Status::InternalError("Table name is not valid."); - } - if (!is_name_valid(params[LABEL_KEY])) { - return Status::InternalError("Label name is not valid."); - } - - return Status::OK(); -} - -Status MiniLoadAction::data_saved_dir(const LoadHandle& desc, const std::string& table, - std::string* file_path) { - std::string prefix; - RETURN_IF_ERROR(_exec_env->load_path_mgr()->allocate_dir(desc.db, desc.label, &prefix)); - timeval tv; - gettimeofday(&tv, nullptr); - struct tm tm; - time_t cur_sec = tv.tv_sec; - localtime_r(&cur_sec, &tm); - char buf[64]; - strftime(buf, 64, "%Y%m%d%H%M%S", &tm); - - std::stringstream ss; - ss << prefix << "/" << table << "." << desc.sub_label << "." << buf << "." << tv.tv_usec; - *file_path = ss.str(); - return Status::OK(); -} - -Status MiniLoadAction::_load(HttpRequest* http_req, const std::string& file_path, - const std::string& user, const std::string& cluster, - int64_t file_size) { - // Prepare request parameters. - std::map params(http_req->query_params().begin(), - http_req->query_params().end()); - RETURN_IF_ERROR(_merge_header(http_req, ¶ms)); - params.erase(LABEL_KEY); - params.erase(SUB_LABEL_KEY); - - // put here to log master information - const TNetworkAddress& master_address = _exec_env->master_info()->network_address; - Status status; - FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address, - config::thrift_rpc_timeout_ms, &status); - if (!status.ok()) { - std::stringstream ss; - ss << "Connect master failed, with address(" << master_address.hostname << ":" - << master_address.port << ")"; - LOG(WARNING) << ss.str(); - return status; - } - TFeResult res; - try { - TMiniLoadRequest req; - req.protocolVersion = FrontendServiceVersion::V1; - req.__set_db(http_req->param(DB_KEY)); - if (!cluster.empty()) { - req.__set_cluster(cluster); - } - req.__set_tbl(http_req->param(TABLE_KEY)); - req.__set_label(http_req->param(LABEL_KEY)); - req.__set_user(user); - // Belong to a multi-load transaction - if (!http_req->param(SUB_LABEL_KEY).empty()) { - req.__set_subLabel(http_req->param(SUB_LABEL_KEY)); - } - req.__set_properties(params); - req.files.push_back(file_path); - req.__isset.file_size = true; - req.file_size.push_back(file_size); - req.backend.__set_hostname(BackendOptions::get_localhost()); - req.backend.__set_port(config::be_port); - - req.__set_timestamp(GetCurrentTimeMicros()); - try { - client->miniLoad(res, req); - } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "Retrying mini load from master(" << master_address.hostname << ":" - << master_address.port << ") because: " << e.what(); - status = client.reopen(config::thrift_rpc_timeout_ms); - if (!status.ok()) { - LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname - << ":" << master_address.port << ")"; - return status; - } - client->miniLoad(res, req); - } catch (apache::thrift::TApplicationException& e) { - LOG(WARNING) << "mini load request from master(" << master_address.hostname << ":" - << master_address.port << ") got unknown result: " << e.what(); - - status = client.reopen(config::thrift_rpc_timeout_ms); - if (!status.ok()) { - LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname - << ":" << master_address.port << ")"; - return status; - } - client->miniLoad(res, req); - } - } catch (apache::thrift::TException& e) { - // failed when retry. - // reopen to disable this connection - client.reopen(config::thrift_rpc_timeout_ms); - std::stringstream ss; - ss << "Request miniload from master(" << master_address.hostname << ":" - << master_address.port << ") because: " << e.what(); - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } - - return Status(res.status); -} - -Status MiniLoadAction::_merge_header(HttpRequest* http_req, - std::map* params) { - if (http_req == nullptr || params == nullptr) { - return Status::OK(); - } - if (!http_req->header(HTTP_FORMAT_KEY).empty()) { - (*params)[HTTP_FORMAT_KEY] = http_req->header(HTTP_FORMAT_KEY); - } - if (!http_req->header(HTTP_COLUMNS).empty()) { - (*params)[HTTP_COLUMNS] = http_req->header(HTTP_COLUMNS); - } - if (!http_req->header(HTTP_WHERE).empty()) { - (*params)[HTTP_WHERE] = http_req->header(HTTP_WHERE); - } - if (!http_req->header(HTTP_COLUMN_SEPARATOR).empty()) { - (*params)[HTTP_COLUMN_SEPARATOR] = http_req->header(HTTP_COLUMN_SEPARATOR); - } - if (!http_req->header(HTTP_PARTITIONS).empty()) { - (*params)[HTTP_PARTITIONS] = http_req->header(HTTP_PARTITIONS); - if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) { - return Status::InvalidArgument( - "Can not specify both partitions and temporary partitions"); - } - } - if (!http_req->header(HTTP_TEMP_PARTITIONS).empty()) { - (*params)[HTTP_TEMP_PARTITIONS] = http_req->header(HTTP_TEMP_PARTITIONS); - if (!http_req->header(HTTP_PARTITIONS).empty()) { - return Status::InvalidArgument( - "Can not specify both partitions and temporary partitions"); - } - } - if (!http_req->header(HTTP_NEGATIVE).empty() && - iequal(http_req->header(HTTP_NEGATIVE), "true")) { - (*params)[HTTP_NEGATIVE] = "true"; - } else { - (*params)[HTTP_NEGATIVE] = "false"; - } - if (!http_req->header(HTTP_STRICT_MODE).empty()) { - if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) { - (*params)[HTTP_STRICT_MODE] = "false"; - } else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) { - (*params)[HTTP_STRICT_MODE] = "true"; - } else { - return Status::InvalidArgument("Invalid strict mode format. Must be bool type"); - } - } - if (!http_req->header(HTTP_TIMEZONE).empty()) { - (*params)[HTTP_TIMEZONE] = http_req->header(HTTP_TIMEZONE); - } - if (!http_req->header(HTTP_EXEC_MEM_LIMIT).empty()) { - (*params)[HTTP_EXEC_MEM_LIMIT] = http_req->header(HTTP_EXEC_MEM_LIMIT); - } - if (!http_req->header(HTTP_JSONPATHS).empty()) { - (*params)[HTTP_JSONPATHS] = http_req->header(HTTP_JSONPATHS); - } - if (!http_req->header(HTTP_JSONROOT).empty()) { - (*params)[HTTP_JSONROOT] = http_req->header(HTTP_JSONROOT); - } - if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) { - if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) { - (*params)[HTTP_STRIP_OUTER_ARRAY] = "true"; - } else { - (*params)[HTTP_STRIP_OUTER_ARRAY] = "false"; - } - } else { - (*params)[HTTP_STRIP_OUTER_ARRAY] = "false"; - } - if (!http_req->header(HTTP_FUZZY_PARSE).empty()) { - if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) { - (*params)[HTTP_FUZZY_PARSE] = "true"; - } else { - (*params)[HTTP_FUZZY_PARSE] = "false"; - } - } else { - (*params)[HTTP_FUZZY_PARSE] = "false"; - } - if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) { - if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) { - (*params)[HTTP_READ_JSON_BY_LINE] = "true"; - } else { - (*params)[HTTP_READ_JSON_BY_LINE] = "false"; - } - } else { - (*params)[HTTP_READ_JSON_BY_LINE] = "false"; - } - - if (!http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL).empty()) { - (*params)[HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL] = - http_req->header(HTTP_FUNCTION_COLUMN + "." + HTTP_SEQUENCE_COL); - } - if (params->find(HTTP_MERGE_TYPE) == params->end()) { - params->insert(std::make_pair(HTTP_MERGE_TYPE, "APPEND")); - } - StringCaseMap merge_type_map = {{"APPEND", TMergeType::APPEND}, - {"DELETE", TMergeType::DELETE}, - {"MERGE", TMergeType::MERGE}}; - if (!http_req->header(HTTP_MERGE_TYPE).empty()) { - std::string merge_type = http_req->header(HTTP_MERGE_TYPE); - auto it = merge_type_map.find(merge_type); - if (it != merge_type_map.end()) { - (*params)[HTTP_MERGE_TYPE] = it->first; - } else { - return Status::InvalidArgument("Invalid merge type " + merge_type); - } - } - if (!http_req->header(HTTP_DELETE_CONDITION).empty()) { - if ((*params)[HTTP_MERGE_TYPE] == "MERGE") { - (*params)[HTTP_DELETE_CONDITION] = http_req->header(HTTP_DELETE_CONDITION); - } else { - return Status::InvalidArgument("not support delete when merge type is " + - (*params)[HTTP_MERGE_TYPE] + "."); - } - } - return Status::OK(); -} - -static bool parse_auth(const std::string& auth, std::string* user, std::string* passwd, - std::string* cluster) { - std::string decoded_auth; - - if (!base64_decode(auth, &decoded_auth)) { - return false; - } - std::string::size_type pos = decoded_auth.find(':'); - if (pos == std::string::npos) { - return false; - } - user->assign(decoded_auth.c_str(), pos); - passwd->assign(decoded_auth.c_str() + pos + 1); - const std::string::size_type cluster_pos = user->find('@'); - if (cluster_pos != std::string::npos) { - cluster->assign(user->c_str(), cluster_pos + 1, pos - cluster_pos - 1); - user->assign(user->c_str(), cluster_pos); - } - return true; -} - -Status MiniLoadAction::check_auth(const HttpRequest* http_req, - const TLoadCheckRequest& check_load_req) { - // put here to log master information - const TNetworkAddress& master_address = _exec_env->master_info()->network_address; - Status status; - FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address, - config::thrift_rpc_timeout_ms, &status); - if (!status.ok()) { - std::stringstream ss; - ss << "Connect master failed, with address(" << master_address.hostname << ":" - << master_address.port << ")"; - LOG(WARNING) << ss.str(); - return status; - } - - TFeResult res; - try { - try { - client->loadCheck(res, check_load_req); - } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "Retrying mini load from master(" << master_address.hostname << ":" - << master_address.port << ") because: " << e.what(); - status = client.reopen(config::thrift_rpc_timeout_ms); - if (!status.ok()) { - LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname - << ":" << master_address.port << ")"; - return status; - } - client->loadCheck(res, check_load_req); - } catch (apache::thrift::TApplicationException& e) { - LOG(WARNING) << "load check request from master(" << master_address.hostname << ":" - << master_address.port << ") got unknown result: " << e.what(); - - status = client.reopen(config::thrift_rpc_timeout_ms); - if (!status.ok()) { - LOG(WARNING) << "Client reopen failed. with address(" << master_address.hostname - << ":" << master_address.port << ")"; - return status; - } - client->loadCheck(res, check_load_req); - } - } catch (apache::thrift::TException& e) { - // failed when retry. - // reopen to disable this connection - client.reopen(config::thrift_rpc_timeout_ms); - std::stringstream ss; - ss << "Request miniload from master(" << master_address.hostname << ":" - << master_address.port << ") because: " << e.what(); - LOG(WARNING) << ss.str(); - return Status::InternalError(ss.str()); - } - - return Status(res.status); -} - -void MiniLoadAction::erase_handle(const LoadHandle& desc) { - // remove - std::lock_guard l(_lock); - _current_load.erase(desc); -} - -int MiniLoadAction::on_header(HttpRequest* req) { - // check authorization first, make client know what happened - if (req->header(HttpHeaders::AUTHORIZATION).empty()) { - HttpChannel::send_basic_challenge(req, "mini_load"); - return -1; - } - - Status status; - MiniLoadCtx* mini_load_ctx = new MiniLoadCtx(_is_streaming(req)); - req->set_handler_ctx(mini_load_ctx); - if (((MiniLoadCtx*)req->handler_ctx())->is_streaming) { - status = _on_new_header(req); - StreamLoadContext* ctx = ((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx; - if (ctx != nullptr) { - ctx->status = status; - } - } else { - status = _on_header(req); - } - if (!status.ok()) { - HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, status.get_error_msg()); - return -1; - } - return 0; -} - -bool MiniLoadAction::_is_streaming(HttpRequest* req) { - // multi load must be non-streaming - if (!req->param(SUB_LABEL_KEY).empty()) { - return false; - } - - TIsMethodSupportedRequest request; - request.__set_function_name(_streaming_function_name); - const TNetworkAddress& master_address = _exec_env->master_info()->network_address; - TFeResult res; - Status status = ThriftRpcHelper::rpc( - master_address.hostname, master_address.port, - [&request, &res](FrontendServiceConnection& client) { - client->isMethodSupported(res, request); - }); - if (!status.ok()) { - std::stringstream ss; - ss << "This mini load is not streaming because: " << status.get_error_msg() - << " with address(" << master_address.hostname << ":" << master_address.port << ")"; - LOG(INFO) << ss.str(); - return false; - } - - status = Status(res.status); - if (!status.ok()) { - std::stringstream ss; - ss << "This streaming mini load is not be supportd because: " << status.get_error_msg() - << " with address(" << master_address.hostname << ":" << master_address.port << ")"; - LOG(INFO) << ss.str(); - return false; - } - return true; -} - -Status MiniLoadAction::_on_header(HttpRequest* req) { - size_t body_bytes = 0; - size_t max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; - if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) { - body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH)); - if (body_bytes > max_body_bytes) { - std::stringstream ss; - ss << "file size exceed max body size, max_body_bytes=" << max_body_bytes; - return Status::InternalError(ss.str()); - } - } else { - evhttp_connection_set_max_body_size( - evhttp_request_get_connection(req->get_evhttp_request()), max_body_bytes); - } - - RETURN_IF_ERROR(check_request(req)); - - std::unique_ptr mini_load_async_ctx(new MiniLoadAsyncCtx(this)); - mini_load_async_ctx->body_bytes = body_bytes; - mini_load_async_ctx->load_handle.db = req->param(DB_KEY); - mini_load_async_ctx->load_handle.label = req->param(LABEL_KEY); - mini_load_async_ctx->load_handle.sub_label = req->param(SUB_LABEL_KEY); - - // check if duplicate - // Use this to prevent that two callback function write to one file - // that file may be writen bad - { - std::lock_guard l(_lock); - if (_current_load.find(mini_load_async_ctx->load_handle) != _current_load.end()) { - return Status::InternalError("Duplicate mini load request."); - } - _current_load.insert(mini_load_async_ctx->load_handle); - mini_load_async_ctx->need_remove_handle = true; - } - // generate load check request - RETURN_IF_ERROR(generate_check_load_req(req, &mini_load_async_ctx->load_check_req)); - - // Check auth - RETURN_IF_ERROR(check_auth(req, mini_load_async_ctx->load_check_req)); - - // Receive data first, keep things easy. - RETURN_IF_ERROR(data_saved_dir(mini_load_async_ctx->load_handle, req->param(TABLE_KEY), - &mini_load_async_ctx->file_path)); - // destructor will close the file handle, not depend on DeferOp any more - mini_load_async_ctx->fd = - open(mini_load_async_ctx->file_path.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0660); - if (mini_load_async_ctx->fd < 0) { - char buf[64]; - LOG(WARNING) << "open file failed, path=" << mini_load_async_ctx->file_path - << ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, sizeof(buf)); - return Status::InternalError("open file failed"); - } - - ((MiniLoadCtx*)req->handler_ctx())->mini_load_async_ctx = mini_load_async_ctx.release(); - return Status::OK(); -} - -void MiniLoadAction::on_chunk_data(HttpRequest* http_req) { - MiniLoadCtx* ctx = (MiniLoadCtx*)http_req->handler_ctx(); - if (ctx->is_streaming) { - _on_new_chunk_data(http_req); - } else { - _on_chunk_data(http_req); - } -} - -void MiniLoadAction::_on_chunk_data(HttpRequest* http_req) { - MiniLoadAsyncCtx* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->mini_load_async_ctx; - if (ctx == nullptr) { - return; - } - - struct evhttp_request* ev_req = http_req->get_evhttp_request(); - auto evbuf = evhttp_request_get_input_buffer(ev_req); - - char buf[4096]; - while (evbuffer_get_length(evbuf) > 0) { - auto n = evbuffer_remove(evbuf, buf, sizeof(buf)); - while (n > 0) { - auto res = write(ctx->fd, buf, n); - if (res < 0) { - char errbuf[64]; - LOG(WARNING) << "write file failed, path=" << ctx->file_path << ", errno=" << errno - << ", errmsg=" << strerror_r(errno, errbuf, sizeof(errbuf)); - HttpChannel::send_reply(http_req, HttpStatus::INTERNAL_SERVER_ERROR, - "write file failed"); - delete ctx; - http_req->set_handler_ctx(nullptr); - return; - } - n -= res; - ctx->bytes_written += res; - } - } -} - -void MiniLoadAction::_on_new_chunk_data(HttpRequest* http_req) { - StreamLoadContext* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->stream_load_ctx; - if (ctx == nullptr || !ctx->status.ok()) { - return; - } - - struct evhttp_request* ev_req = http_req->get_evhttp_request(); - auto evbuf = evhttp_request_get_input_buffer(ev_req); - - while (evbuffer_get_length(evbuf) > 0) { - auto bb = ByteBuffer::allocate(4096); - auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity); - bb->pos = remove_bytes; - bb->flip(); - auto st = ctx->body_sink->append(bb); - if (!st.ok()) { - LOG(WARNING) << "append body content failed. errmsg=" << st.get_error_msg() - << ctx->brief(); - ctx->status = st; - return; - } - ctx->receive_bytes += remove_bytes; - } -} - -void MiniLoadAction::free_handler_ctx(void* param) { - MiniLoadCtx* ctx = (MiniLoadCtx*)param; - if (ctx->is_streaming) { - StreamLoadContext* streaming_ctx = ((MiniLoadCtx*)param)->stream_load_ctx; - if (streaming_ctx != nullptr) { - // sender is going, make receiver know it - if (streaming_ctx->body_sink != nullptr) { - LOG(WARNING) << "cancel stream load " << streaming_ctx->id.to_string() - << " because sender failed"; - streaming_ctx->body_sink->cancel("sender failed"); - } - if (streaming_ctx->unref()) { - delete streaming_ctx; - } - } - } else { - MiniLoadAsyncCtx* async_ctx = ((MiniLoadCtx*)param)->mini_load_async_ctx; - delete async_ctx; - } - delete ctx; -} - -void MiniLoadAction::handle(HttpRequest* http_req) { - MiniLoadCtx* ctx = (MiniLoadCtx*)http_req->handler_ctx(); - if (ctx->is_streaming) { - _new_handle(http_req); - } else { - _handle(http_req); - } -} - -void MiniLoadAction::_handle(HttpRequest* http_req) { - MiniLoadAsyncCtx* ctx = ((MiniLoadCtx*)http_req->handler_ctx())->mini_load_async_ctx; - if (ctx == nullptr) { - // when ctx is nullptr, there must be error happened when on_chunk_data - // and reply is sent, we just return with no operation - LOG(WARNING) << "handler context is nullptr when MiniLoad callback execute, uri=" - << http_req->uri(); - return; - } - if (ctx->body_bytes > 0 && ctx->bytes_written != ctx->body_bytes) { - LOG(WARNING) << "bytes written is not equal with body size, uri=" << http_req->uri() - << ", body_bytes=" << ctx->body_bytes - << ", bytes_written=" << ctx->bytes_written; - HttpChannel::send_reply(http_req, HttpStatus::INTERNAL_SERVER_ERROR, - "receipt size not equal with body size"); - return; - } - auto st = _load(http_req, ctx->file_path, ctx->load_check_req.user, ctx->load_check_req.cluster, - ctx->bytes_written); - std::string str = st.to_json(); - HttpChannel::send_reply(http_req, str); -} - -Status MiniLoadAction::generate_check_load_req(const HttpRequest* http_req, - TLoadCheckRequest* check_load_req) { - const char k_basic[] = "Basic "; - const std::string& auth = http_req->header(HttpHeaders::AUTHORIZATION); - if (auth.compare(0, sizeof(k_basic) - 1, k_basic, sizeof(k_basic) - 1) != 0) { - return Status::InternalError("Not support Basic authorization."); - } - - check_load_req->protocolVersion = FrontendServiceVersion::V1; - // Skip "Basic " - std::string str = auth.substr(sizeof(k_basic) - 1); - std::string cluster; - if (!parse_auth(str, &(check_load_req->user), &(check_load_req->passwd), &cluster)) { - LOG(WARNING) << "parse auth string failed." << auth << " and str " << str; - return Status::InternalError("Parse authorization failed."); - } - if (!cluster.empty()) { - check_load_req->__set_cluster(cluster); - } - check_load_req->db = http_req->param(DB_KEY); - check_load_req->__set_tbl(http_req->param(TABLE_KEY)); - if (http_req->param(SUB_LABEL_KEY).empty()) { - check_load_req->__set_label(http_req->param(LABEL_KEY)); - check_load_req->__set_timestamp(GetCurrentTimeMicros()); - } - - if (http_req->remote_host() != nullptr) { - std::string user_ip(http_req->remote_host()); - check_load_req->__set_user_ip(user_ip); - } - - return Status::OK(); -} - -bool LoadHandleCmp::operator()(const LoadHandle& lhs, const LoadHandle& rhs) const { - int ret = lhs.label.compare(rhs.label); - if (ret < 0) { - return true; - } else if (ret > 0) { - return false; - } - - ret = lhs.sub_label.compare(rhs.sub_label); - if (ret < 0) { - return true; - } else if (ret > 0) { - return false; - } - - ret = lhs.db.compare(rhs.db); - if (ret < 0) { - return true; - } - - return false; -} - -// fe will begin the txn and record the metadata of load -Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) { - // prepare begin mini load request params - TMiniLoadBeginRequest request; - set_request_auth(&request, ctx->auth); - request.db = ctx->db; - request.tbl = ctx->table; - request.label = ctx->label; - if (!ctx->sub_label.empty()) { - request.__set_sub_label(ctx->sub_label); - } - if (ctx->timeout_second != -1) { - request.__set_timeout_second(ctx->timeout_second); - } - if (ctx->max_filter_ratio != 0.0) { - request.__set_max_filter_ratio(ctx->max_filter_ratio); - } - request.__set_create_timestamp(UnixMillis()); - request.__set_request_id(ctx->id.to_thrift()); - // begin load by master - const TNetworkAddress& master_addr = _exec_env->master_info()->network_address; - TMiniLoadBeginResult res; - RETURN_IF_ERROR(ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, - [&request, &res](FrontendServiceConnection& client) { - client->miniLoadBegin(res, request); - })); - Status begin_status(res.status); - if (!begin_status.ok()) { - LOG(INFO) << "failed to begin mini load " << ctx->label - << " with error msg:" << begin_status.get_error_msg(); - return begin_status; - } - ctx->txn_id = res.txn_id; - // txn has been begun in fe - ctx->need_rollback = true; - LOG(INFO) << "load:" << ctx->label << " txn:" << res.txn_id << " has been begun in fe"; - return Status::OK(); -} - -Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) { - // prepare request parameters - TStreamLoadPutRequest put_request; - set_request_auth(&put_request, ctx->auth); - put_request.db = ctx->db; - put_request.tbl = ctx->table; - put_request.txnId = ctx->txn_id; - put_request.formatType = ctx->format; - put_request.__set_loadId(ctx->id.to_thrift()); - put_request.fileType = TFileType::FILE_STREAM; - std::map params(req->query_params().begin(), - req->query_params().end()); - /* merge params of columns and hll - * for example: - * input: columns=c1,tmp_c2,tmp_c3\&hll=hll_c2,tmp_c2:hll_c3,tmp_c3 - * output: columns=c1,tmp_c2,tmp_c3,hll_c2=hll_hash(tmp_c2),hll_c3=hll_hash(tmp_c3) - */ - auto columns_it = params.find(COLUMNS_KEY); - if (columns_it != params.end()) { - std::string columns_value = columns_it->second; - auto hll_it = params.find(HLL_KEY); - if (hll_it != params.end()) { - std::string hll_value = hll_it->second; - if (hll_value.empty()) { - return Status::InvalidArgument( - "Hll value could not be empty when hll key is exists!"); - } - std::map hll_map; - RETURN_IF_ERROR(StringParser::split_string_to_map(hll_value, ":", ",", &hll_map)); - if (hll_map.empty()) { - return Status::InvalidArgument("Hll value could not transform to hll expr: " + - hll_value); - } - for (auto& hll_element : hll_map) { - columns_value += "," + hll_element.first + "=hll_hash(" + hll_element.second + ")"; - } - } - put_request.__set_columns(columns_value); - } - auto column_separator_it = params.find(COLUMN_SEPARATOR_KEY); - if (column_separator_it != params.end()) { - put_request.__set_columnSeparator(column_separator_it->second); - } - if (ctx->timeout_second != -1) { - put_request.__set_timeout(ctx->timeout_second); - } - auto strict_mode_it = params.find(STRICT_MODE_KEY); - if (strict_mode_it != params.end()) { - std::string strict_mode_value = strict_mode_it->second; - if (iequal(strict_mode_value, "false")) { - put_request.__set_strictMode(false); - } else if (iequal(strict_mode_value, "true")) { - put_request.__set_strictMode(true); - } else { - return Status::InvalidArgument("Invalid strict mode format. Must be bool type"); - } - } - - // plan this load - TNetworkAddress master_addr = _exec_env->master_info()->network_address; - RETURN_IF_ERROR(ThriftRpcHelper::rpc( - master_addr.hostname, master_addr.port, - [&put_request, ctx](FrontendServiceConnection& client) { - client->streamLoadPut(ctx->put_result, put_request); - })); - Status plan_status(ctx->put_result.status); - if (!plan_status.ok()) { - LOG(WARNING) << "plan streaming load failed. errmsg=" << plan_status.get_error_msg() - << ctx->brief(); - return plan_status; - } - VLOG_NOTICE << "params is " << apache::thrift::ThriftDebugString(ctx->put_result.params); - return Status::OK(); -} - -// new on_header of mini load -Status MiniLoadAction::_on_new_header(HttpRequest* req) { - size_t body_bytes = 0; - size_t max_body_bytes = config::streaming_load_max_mb * 1024 * 1024; - if (!req->header(HttpHeaders::CONTENT_LENGTH).empty()) { - body_bytes = std::stol(req->header(HttpHeaders::CONTENT_LENGTH)); - if (body_bytes > max_body_bytes) { - std::stringstream ss; - ss << "file size exceed max body size, max_body_bytes=" << max_body_bytes; - return Status::InvalidArgument(ss.str()); - } - } else { - evhttp_connection_set_max_body_size( - evhttp_request_get_connection(req->get_evhttp_request()), max_body_bytes); - } - - RETURN_IF_ERROR(check_request(req)); - - StreamLoadContext* ctx = new StreamLoadContext(_exec_env); - ctx->ref(); - ((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx = ctx; - - // auth information - if (!parse_basic_auth(*req, &ctx->auth)) { - LOG(WARNING) << "parse basic authorization failed." << ctx->brief(); - return Status::InvalidArgument("no valid Basic authorization"); - } - - ctx->load_type = TLoadType::MINI_LOAD; - ctx->load_src_type = TLoadSourceType::RAW; - - ctx->db = req->param(DB_KEY); - ctx->table = req->param(TABLE_KEY); - ctx->label = req->param(LABEL_KEY); - if (!req->param(SUB_LABEL_KEY).empty()) { - ctx->sub_label = req->param(SUB_LABEL_KEY); - } - ctx->format = TFileFormatType::FORMAT_CSV_PLAIN; - std::map params(req->query_params().begin(), - req->query_params().end()); - auto max_filter_ratio_it = params.find(MAX_FILTER_RATIO_KEY); - if (max_filter_ratio_it != params.end()) { - ctx->max_filter_ratio = strtod(max_filter_ratio_it->second.c_str(), nullptr); - } - auto timeout_it = params.find(TIMEOUT_KEY); - if (timeout_it != params.end()) { - try { - ctx->timeout_second = std::stoi(timeout_it->second); - } catch (const std::invalid_argument& e) { - return Status::InvalidArgument("Invalid timeout format"); - } - } - - LOG(INFO) << "new income mini load request." << ctx->brief() << ", db: " << ctx->db - << ", tbl: " << ctx->table; - - // record metadata in frontend - RETURN_IF_ERROR(_begin_mini_load(ctx)); - - // open sink - auto pipe = std::make_shared(); - RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(ctx->id, pipe)); - ctx->body_sink = pipe; - - // get plan from fe - RETURN_IF_ERROR(_process_put(req, ctx)); - - // execute plan - return _exec_env->stream_load_executor()->execute_plan_fragment(ctx); -} - -void MiniLoadAction::_new_handle(HttpRequest* req) { - StreamLoadContext* ctx = ((MiniLoadCtx*)req->handler_ctx())->stream_load_ctx; - DCHECK(ctx != nullptr); - - if (ctx->status.ok()) { - ctx->status = _on_new_handle(ctx); - if (!ctx->status.ok()) { - LOG(WARNING) << "handle mini load failed, id=" << ctx->id - << ", errmsg=" << ctx->status.get_error_msg(); - } - } - - // if failed to commit and status is not PUBLISH_TIMEOUT, rollback the txn. - // PUBLISH_TIMEOUT is treated as OK in mini load, because user will use show load stmt - // to see the final result. - if (!ctx->status.ok() && ctx->status.code() != TStatusCode::PUBLISH_TIMEOUT) { - if (ctx->need_rollback) { - _exec_env->stream_load_executor()->rollback_txn(ctx); - ctx->need_rollback = false; - } - if (ctx->body_sink.get() != nullptr) { - ctx->body_sink->cancel(ctx->status.get_error_msg()); - } - } - - std::string str = ctx->to_json_for_mini_load(); - str += '\n'; - HttpChannel::send_reply(req, str); -} - -Status MiniLoadAction::_on_new_handle(StreamLoadContext* ctx) { - if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) { - LOG(WARNING) << "receive body don't equal with body bytes, body_bytes=" << ctx->body_bytes - << ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id; - return Status::InternalError("receive body don't equal with body bytes"); - } - - // wait stream load sink finish - RETURN_IF_ERROR(ctx->body_sink->finish()); - - // wait stream load finish - RETURN_IF_ERROR(ctx->future.get()); - - // commit this load with mini load attachment - RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx)); - - return Status::OK(); -} - -} // namespace doris diff --git a/be/src/http/action/mini_load.h b/be/src/http/action/mini_load.h deleted file mode 100644 index a21a450f19..0000000000 --- a/be/src/http/action/mini_load.h +++ /dev/null @@ -1,109 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include - -#include "common/status.h" -#include "gen_cpp/FrontendService.h" -#include "http/http_handler.h" -#include "runtime/stream_load/stream_load_context.h" -#include "util/defer_op.h" - -namespace doris { - -// Used to identify one mini load job -struct LoadHandle { - std::string db; - std::string label; - std::string sub_label; -}; - -struct LoadHandleCmp { - bool operator()(const LoadHandle& lhs, const LoadHandle& rhs) const; -}; - -class TMasterResult; -class ExecEnv; -class StreamLoadContext; - -// This a handler for mini load -// path is /api/{db}/{table}/_load -class MiniLoadAction : public HttpHandler { -public: - MiniLoadAction(ExecEnv* exec_env); - - virtual ~MiniLoadAction() {} - - void handle(HttpRequest* req) override; - - bool request_will_be_read_progressively() override { return true; } - - int on_header(HttpRequest* req) override; - - void on_chunk_data(HttpRequest* req) override; - void free_handler_ctx(void* ctx) override; - - void erase_handle(const LoadHandle& handle); - -private: - Status _load(HttpRequest* req, const std::string& file_path, const std::string& user, - const std::string& cluster, int64_t file_size); - - Status data_saved_dir(const LoadHandle& desc, const std::string& table, std::string* file_path); - - Status _on_header(HttpRequest* http_req); - - Status generate_check_load_req(const HttpRequest* http_req, TLoadCheckRequest* load_check_req); - - Status check_auth(const HttpRequest* http_req, const TLoadCheckRequest& load_check_req); - - void _on_chunk_data(HttpRequest* http_req); - - void _handle(HttpRequest* http_req); - - // streaming mini load - Status _on_new_header(HttpRequest* req); - - Status _begin_mini_load(StreamLoadContext* ctx); - - Status _process_put(HttpRequest* req, StreamLoadContext* ctx); - - void _on_new_chunk_data(HttpRequest* http_req); - - void _new_handle(HttpRequest* req); - - Status _on_new_handle(StreamLoadContext* ctx); - - bool _is_streaming(HttpRequest* req); - - Status _merge_header(HttpRequest* http_req, std::map* params); - - const std::string _streaming_function_name = "STREAMING_MINI_LOAD"; - - ExecEnv* _exec_env; - - std::mutex _lock; - // Used to check if load is duplicated in this instance. - std::set _current_load; -}; - -} // namespace doris diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 7692ae933f..2cc56e137b 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -60,7 +60,6 @@ set(RUNTIME_FILES qsorter.cpp fragment_mgr.cpp dpp_sink_internal.cpp - etl_job_mgr.cpp load_path_mgr.cpp types.cpp tmp_file_mgr.cc diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp deleted file mode 100644 index ce4029b144..0000000000 --- a/be/src/runtime/etl_job_mgr.cpp +++ /dev/null @@ -1,302 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/etl_job_mgr.h" - -#include -#include - -#include "gen_cpp/FrontendService.h" -#include "gen_cpp/HeartbeatService_types.h" -#include "gen_cpp/MasterService_types.h" -#include "gen_cpp/Status_types.h" -#include "gen_cpp/Types_types.h" -#include "runtime/client_cache.h" -#include "runtime/exec_env.h" -#include "runtime/fragment_mgr.h" -#include "runtime/plan_fragment_executor.h" -#include "runtime/runtime_state.h" -#include "service/backend_options.h" -#include "util/file_utils.h" -#include "util/uid_util.h" - -namespace doris { - -#define VLOG_ETL VLOG_CRITICAL - -std::string EtlJobMgr::to_http_path(const std::string& file_name) { - std::stringstream url; - url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port - << "/api/_download_load?" - << "token=" << _exec_env->token() << "&file=" << file_name; - return url.str(); -} - -std::string EtlJobMgr::to_load_error_http_path(const std::string& file_name) { - std::stringstream url; - url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port - << "/api/_load_error_log?" - << "file=" << file_name; - return url.str(); -} - -const std::string DPP_NORMAL_ALL = "dpp.norm.ALL"; -const std::string DPP_ABNORMAL_ALL = "dpp.abnorm.ALL"; -const std::string ERROR_FILE_PREFIX = "error_log"; - -EtlJobMgr::EtlJobMgr(ExecEnv* exec_env) - : _exec_env(exec_env), _success_jobs(5000), _failed_jobs(5000) {} - -EtlJobMgr::~EtlJobMgr() {} - -Status EtlJobMgr::init() { - return Status::OK(); -} - -Status EtlJobMgr::start_job(const TMiniLoadEtlTaskRequest& req) { - const TUniqueId& id = req.params.params.fragment_instance_id; - std::lock_guard l(_lock); - auto it = _running_jobs.find(id); - if (it != _running_jobs.end()) { - // Already have this job, return what??? - LOG(INFO) << "Duplicated etl job(" << id << ")"; - return Status::OK(); - } - - // If already success, we return Status::OK() - // and wait master ask me success information - if (_success_jobs.exists(id)) { - // Already success - LOG(INFO) << "Already successful etl job(" << id << ")"; - return Status::OK(); - } - - RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment( - req.params, std::bind(&EtlJobMgr::finalize_job, this, std::placeholders::_1))); - - // redo this job if failed before - if (_failed_jobs.exists(id)) { - _failed_jobs.erase(id); - } - - VLOG_ETL << "Job id(" << id << ") insert to EtlJobMgr."; - _running_jobs.insert(id); - - return Status::OK(); -} - -void EtlJobMgr::report_to_master(PlanFragmentExecutor* executor) { - TUpdateMiniEtlTaskStatusRequest request; - RuntimeState* state = executor->runtime_state(); - request.protocolVersion = FrontendServiceVersion::V1; - request.etlTaskId = state->fragment_instance_id(); - Status status = get_job_state(state->fragment_instance_id(), &request.etlTaskStatus); - if (!status.ok()) { - return; - } - const TNetworkAddress& master_address = _exec_env->master_info()->network_address; - FrontendServiceConnection client(_exec_env->frontend_client_cache(), master_address, - config::thrift_rpc_timeout_ms, &status); - if (!status.ok()) { - std::stringstream ss; - ss << "Connect master failed, with address(" << master_address.hostname << ":" - << master_address.port << ")"; - LOG(WARNING) << ss.str(); - return; - } - TFeResult res; - try { - try { - client->updateMiniEtlTaskStatus(res, request); - } catch (apache::thrift::transport::TTransportException& e) { - LOG(WARNING) << "Retrying report etl jobs status to master(" << master_address.hostname - << ":" << master_address.port << ") because: " << e.what(); - status = client.reopen(config::thrift_rpc_timeout_ms); - if (!status.ok()) { - LOG(WARNING) << "Client repoen failed. with address(" << master_address.hostname - << ":" << master_address.port << ")"; - return; - } - client->updateMiniEtlTaskStatus(res, request); - } - } catch (apache::thrift::TException& e) { - // failed when retry. - // reopen to disable this connection - client.reopen(config::thrift_rpc_timeout_ms); - std::stringstream ss; - ss << "Report etl task to master(" << master_address.hostname << ":" << master_address.port - << ") failed because: " << e.what(); - LOG(WARNING) << ss.str(); - } - // TODO(lingbin): check status of 'res' here. - // because there are some checks in updateMiniEtlTaskStatus, for example max_filter_ratio. - LOG(INFO) << "Successfully report elt job status to master.id=" << print_id(request.etlTaskId); -} - -void EtlJobMgr::finalize_job(PlanFragmentExecutor* executor) { - EtlJobResult result; - - RuntimeState* state = executor->runtime_state(); - if (executor->status().ok()) { - // Get files - for (auto& it : state->output_files()) { - int64_t file_size = std::filesystem::file_size(it); - result.file_map[to_http_path(it)] = file_size; - } - // set statistics - result.process_normal_rows = state->num_rows_load_success(); - result.process_abnormal_rows = state->num_rows_load_filtered(); - } else { - // get debug path - result.process_normal_rows = state->num_rows_load_success(); - result.process_abnormal_rows = state->num_rows_load_filtered(); - } - - result.debug_path = state->get_error_log_file_path(); - - finish_job(state->fragment_instance_id(), executor->status(), result); - - // Try to report this finished task to master - report_to_master(executor); -} - -Status EtlJobMgr::cancel_job(const TUniqueId& id) { - std::lock_guard l(_lock); - auto it = _running_jobs.find(id); - if (it == _running_jobs.end()) { - // Nothing to do - LOG(INFO) << "No such job id, just print to info " << id; - return Status::OK(); - } - _running_jobs.erase(it); - VLOG_ETL << "id(" << id << ") have been removed from EtlJobMgr."; - EtlJobCtx job_ctx; - job_ctx.finish_status = Status::Cancelled("Cancelled"); - _failed_jobs.put(id, job_ctx); - return Status::OK(); -} - -Status EtlJobMgr::finish_job(const TUniqueId& id, const Status& finish_status, - const EtlJobResult& result) { - std::lock_guard l(_lock); - - auto it = _running_jobs.find(id); - if (it == _running_jobs.end()) { - std::stringstream ss; - ss << "Unknown job id(" << id << ")."; - return Status::InternalError(ss.str()); - } - _running_jobs.erase(it); - - EtlJobCtx ctx; - ctx.finish_status = finish_status; - ctx.result = result; - if (finish_status.ok()) { - _success_jobs.put(id, ctx); - } else { - _failed_jobs.put(id, ctx); - } - - VLOG_ETL << "Move job(" << id << ") from running to " - << (finish_status.ok() ? "success jobs" : "failed jobs"); - - return Status::OK(); -} - -Status EtlJobMgr::get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* result) { - std::lock_guard l(_lock); - auto it = _running_jobs.find(id); - if (it != _running_jobs.end()) { - result->status.__set_status_code(TStatusCode::OK); - result->__set_etl_state(TEtlState::RUNNING); - return Status::OK(); - } - // Successful - if (_success_jobs.exists(id)) { - EtlJobCtx ctx; - _success_jobs.get(id, &ctx); - result->status.__set_status_code(TStatusCode::OK); - result->__set_etl_state(TEtlState::FINISHED); - result->__set_file_map(ctx.result.file_map); - - // set counter - std::map counter; - counter[DPP_NORMAL_ALL] = std::to_string(ctx.result.process_normal_rows); - counter[DPP_ABNORMAL_ALL] = std::to_string(ctx.result.process_abnormal_rows); - result->__set_counters(counter); - - if (!ctx.result.debug_path.empty()) { - result->__set_tracking_url(to_load_error_http_path(ctx.result.debug_path)); - } - return Status::OK(); - } - // failed information - if (_failed_jobs.exists(id)) { - EtlJobCtx ctx; - _failed_jobs.get(id, &ctx); - result->status.__set_status_code(TStatusCode::OK); - result->__set_etl_state(TEtlState::CANCELLED); - - if (!ctx.result.debug_path.empty()) { - result->__set_tracking_url(to_http_path(ctx.result.debug_path)); - } - return Status::OK(); - } - // NO this jobs - result->status.__set_status_code(TStatusCode::OK); - result->__set_etl_state(TEtlState::CANCELLED); - return Status::OK(); -} - -Status EtlJobMgr::erase_job(const TDeleteEtlFilesRequest& req) { - std::lock_guard l(_lock); - const TUniqueId& id = req.mini_load_id; - auto it = _running_jobs.find(id); - if (it != _running_jobs.end()) { - std::stringstream ss; - ss << "Job(" << id << ") is running, can not be deleted."; - return Status::InternalError(ss.str()); - } - _success_jobs.erase(id); - _failed_jobs.erase(id); - - return Status::OK(); -} - -void EtlJobMgr::debug(std::stringstream& ss) { - // Make things easy - std::lock_guard l(_lock); - - // Debug summary - ss << "we have " << _running_jobs.size() << " jobs Running\n"; - ss << "we have " << _failed_jobs.size() << " jobs Failed\n"; - ss << "we have " << _success_jobs.size() << " jobs Successful\n"; - // Debug running jobs - for (auto& it : _running_jobs) { - ss << "running jobs: " << it << "\n"; - } - // Debug success jobs - for (auto& it : _success_jobs) { - ss << "successful jobs: " << it.first << "\n"; - } - // Debug failed jobs - for (auto& it : _failed_jobs) { - ss << "failed jobs: " << it.first << "\n"; - } -} - -} // namespace doris diff --git a/be/src/runtime/etl_job_mgr.h b/be/src/runtime/etl_job_mgr.h deleted file mode 100644 index 306cf0397f..0000000000 --- a/be/src/runtime/etl_job_mgr.h +++ /dev/null @@ -1,99 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include -#include -#include -#include - -#include "common/status.h" -#include "gen_cpp/Types_types.h" -#include "http/rest_monitor_iface.h" -#include "util/hash_util.hpp" -#include "util/lru_cache.hpp" - -namespace doris { - -// used to report to master -struct EtlJobResult { - EtlJobResult() : process_normal_rows(0), process_abnormal_rows(0) {} - std::string debug_path; - std::map file_map; - int64_t process_normal_rows; - int64_t process_abnormal_rows; -}; - -// used to report to master -struct EtlJobCtx { - Status finish_status; - EtlJobResult result; -}; - -class TMiniLoadEtlStatusResult; -class TMiniLoadEtlTaskRequest; -class ExecEnv; -class PlanFragmentExecutor; -class TDeleteEtlFilesRequest; - -// manager of all the Etl job -// used this because master may loop be to check if a load job is finished. -class EtlJobMgr : public RestMonitorIface { -public: - EtlJobMgr(ExecEnv* exec_env); - - virtual ~EtlJobMgr(); - - // make trash directory for collect - Status init(); - - // Make a job to running state - // If this job is successful, return OK - // If this job is failed, move this job from _failed_jobs to _running_jobs - // Otherwise, put it to _running_jobs - Status start_job(const TMiniLoadEtlTaskRequest& req); - - // Make a running job to failed job - Status cancel_job(const TUniqueId& id); - - Status finish_job(const TUniqueId& id, const Status& finish_status, const EtlJobResult& result); - - Status get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult* result); - - Status erase_job(const TDeleteEtlFilesRequest& req); - - void finalize_job(PlanFragmentExecutor* executor); - - virtual void debug(std::stringstream& ss); - -private: - std::string to_http_path(const std::string& file_path); - std::string to_load_error_http_path(const std::string& file_path); - - void report_to_master(PlanFragmentExecutor* executor); - - ExecEnv* _exec_env; - std::mutex _lock; - std::unordered_set _running_jobs; - LruCache _success_jobs; - LruCache _failed_jobs; -}; - -} // namespace doris diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 6fda2e6a1f..e0e09ae047 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -128,7 +128,6 @@ public: FragmentMgr* fragment_mgr() { return _fragment_mgr; } ResultCache* result_cache() { return _result_cache; } TMasterInfo* master_info() { return _master_info; } - EtlJobMgr* etl_job_mgr() { return _etl_job_mgr; } LoadPathMgr* load_path_mgr() { return _load_path_mgr; } DiskIoMgr* disk_io_mgr() { return _disk_io_mgr; } TmpFileMgr* tmp_file_mgr() { return _tmp_file_mgr; } @@ -207,7 +206,6 @@ private: FragmentMgr* _fragment_mgr = nullptr; ResultCache* _result_cache = nullptr; TMasterInfo* _master_info = nullptr; - EtlJobMgr* _etl_job_mgr = nullptr; LoadPathMgr* _load_path_mgr = nullptr; DiskIoMgr* _disk_io_mgr = nullptr; TmpFileMgr* _tmp_file_mgr = nullptr; diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 264565e4d6..2347d2bea8 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -31,7 +31,6 @@ #include "runtime/client_cache.h" #include "runtime/data_stream_mgr.h" #include "runtime/disk_io_mgr.h" -#include "runtime/etl_job_mgr.h" #include "runtime/exec_env.h" #include "runtime/external_scan_context_mgr.h" #include "runtime/fold_constant_executor.h" @@ -128,7 +127,6 @@ Status ExecEnv::_init(const std::vector& store_paths) { _result_cache = new ResultCache(config::query_cache_max_size_mb, config::query_cache_elasticity_size_mb); _master_info = new TMasterInfo(); - _etl_job_mgr = new EtlJobMgr(this); _load_path_mgr = new LoadPathMgr(this); _disk_io_mgr = new DiskIoMgr(); _tmp_file_mgr = new TmpFileMgr(this); @@ -147,7 +145,6 @@ Status ExecEnv::_init(const std::vector& store_paths) { _broker_client_cache->init_metrics("broker"); _result_mgr->init(); _cgroups_mgr->init_cgroups(); - _etl_job_mgr->init(); Status status = _load_path_mgr->init(); if (!status.ok()) { LOG(ERROR) << "load path mgr init failed." << status.get_error_msg(); @@ -335,7 +332,6 @@ void ExecEnv::_destroy() { SAFE_DELETE(_tmp_file_mgr); SAFE_DELETE(_disk_io_mgr); SAFE_DELETE(_load_path_mgr); - SAFE_DELETE(_etl_job_mgr); SAFE_DELETE(_master_info); SAFE_DELETE(_fragment_mgr); SAFE_DELETE(_cgroups_mgr); diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 4f080318b8..37491fc98e 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -335,17 +335,7 @@ bool StreamLoadExecutor::collect_load_stat(StreamLoadContext* ctx, TTxnCommitAtt } switch (ctx->load_type) { case TLoadType::MINI_LOAD: { - attach->loadType = TLoadType::MINI_LOAD; - - TMiniLoadTxnCommitAttachment ml_attach; - ml_attach.loadedRows = ctx->number_loaded_rows; - ml_attach.filteredRows = ctx->number_filtered_rows; - if (!ctx->error_url.empty()) { - ml_attach.__set_errorLogUrl(ctx->error_url); - } - - attach->mlTxnCommitAttachment = std::move(ml_attach); - attach->__isset.mlTxnCommitAttachment = true; + LOG(FATAL) << "mini load is not supported any more"; break; } case TLoadType::ROUTINE_LOAD: { diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index bb47fe9e4e..c3f1261a63 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -36,10 +36,6 @@ class ThriftServer; class TAgentResult; class TAgentTaskRequest; class TAgentPublishRequest; -class TMiniLoadEtlTaskRequest; -class TMiniLoadEtlStatusResult; -class TMiniLoadEtlStatusRequest; -class TDeleteEtlFilesRequest; class TPlanExecRequest; class TPlanExecParams; class TExecPlanFragmentParams; @@ -97,23 +93,6 @@ public: _agent_server->publish_cluster_state(result, request); } - virtual void submit_etl_task(TAgentResult& result, - const TMiniLoadEtlTaskRequest& request) override { - VLOG_RPC << "submit_etl_task. request is " - << apache::thrift::ThriftDebugString(request).c_str(); - _agent_server->submit_etl_task(result, request); - } - - virtual void get_etl_status(TMiniLoadEtlStatusResult& result, - const TMiniLoadEtlStatusRequest& request) override { - _agent_server->get_etl_status(result, request); - } - - virtual void delete_etl_files(TAgentResult& result, - const TDeleteEtlFilesRequest& request) override { - _agent_server->delete_etl_files(result, request); - } - // DorisServer service virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val, const TExecPlanFragmentParams& params) override; diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 3ce77f6bb6..80dd2612b6 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -25,7 +25,6 @@ #include "http/action/health_action.h" #include "http/action/meta_action.h" #include "http/action/metrics_action.h" -#include "http/action/mini_load.h" #include "http/action/pprof_actions.h" #include "http/action/reload_tablet_action.h" #include "http/action/reset_rpc_channel_action.h" @@ -57,9 +56,9 @@ Status HttpService::start() { add_default_path_handlers(_web_page_handler.get(), MemTracker::get_process_tracker()); // register load - MiniLoadAction* miniload_action = _pool.add(new MiniLoadAction(_env)); - _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load", miniload_action); StreamLoadAction* streamload_action = _pool.add(new StreamLoadAction(_env)); + _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_load", + streamload_action); _ev_http_server->register_handler(HttpMethod::PUT, "/api/{db}/{table}/_stream_load", streamload_action); StreamLoad2PCAction* streamload_2pc_action = _pool.add(new StreamLoad2PCAction(_env)); diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt index 879b6f7b24..49c14c49f6 100644 --- a/be/test/CMakeLists.txt +++ b/be/test/CMakeLists.txt @@ -215,7 +215,6 @@ set(RUNTIME_TEST_FILES # runtime/dpp_sink_internal_test.cpp # runtime/dpp_sink_test.cpp # runtime/data_spliter_test.cpp - # runtime/etl_job_mgr_test.cpp # runtime/tmp_file_mgr_test.cpp # runtime/disk_io_mgr_test.cpp # runtime/thread_resource_mgr_test.cpp diff --git a/be/test/runtime/data_stream_test.cpp b/be/test/runtime/data_stream_test.cpp index 39d7cba249..b0a8de6529 100644 --- a/be/test/runtime/data_stream_test.cpp +++ b/be/test/runtime/data_stream_test.cpp @@ -94,15 +94,6 @@ public: virtual void publish_cluster_state(TAgentResult& return_val, const TAgentPublishRequest& request) {} - virtual void submit_etl_task(TAgentResult& return_val, const TMiniLoadEtlTaskRequest& request) { - } - - virtual void get_etl_status(TMiniLoadEtlStatusResult& return_val, - const TMiniLoadEtlStatusRequest& request) {} - - virtual void delete_etl_files(TAgentResult& return_val, const TDeleteEtlFilesRequest& request) { - } - virtual void register_pull_load_task(TStatus& _return, const TUniqueId& id, const int32_t num_senders) {} diff --git a/be/test/runtime/etl_job_mgr_test.cpp b/be/test/runtime/etl_job_mgr_test.cpp deleted file mode 100644 index edb8d140c3..0000000000 --- a/be/test/runtime/etl_job_mgr_test.cpp +++ /dev/null @@ -1,221 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/etl_job_mgr.h" - -#include - -#include "gen_cpp/Types_types.h" -#include "runtime/exec_env.h" -#include "runtime/fragment_mgr.h" -#include "util/cpu_info.h" - -namespace doris { -// Mock fragment mgr -Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) { - return Status::OK(); -} - -FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _thread_pool(10, 128) {} - -FragmentMgr::~FragmentMgr() {} - -void FragmentMgr::debug(std::stringstream& ss) {} - -class EtlJobMgrTest : public testing::Test { -public: - EtlJobMgrTest() {} - -private: - ExecEnv _exec_env; -}; - -TEST_F(EtlJobMgrTest, NormalCase) { - EtlJobMgr mgr(&_exec_env); - TUniqueId id; - id.lo = 1; - id.hi = 1; - - TMiniLoadEtlStatusResult res; - TMiniLoadEtlTaskRequest req; - TDeleteEtlFilesRequest del_req; - del_req.mini_load_id = id; - req.params.params.fragment_instance_id = id; - - // make it running - EXPECT_TRUE(mgr.start_job(req).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::RUNNING, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); - - // make it finishing - EtlJobResult job_result; - job_result.file_map["abc"] = 100L; - EXPECT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::FINISHED, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); - EXPECT_EQ(1, res.file_map.size()); - EXPECT_EQ(100, res.file_map["abc"]); - - // erase it - EXPECT_TRUE(mgr.erase_job(del_req).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::CANCELLED, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); -} - -TEST_F(EtlJobMgrTest, DuplicateCase) { - EtlJobMgr mgr(&_exec_env); - TUniqueId id; - id.lo = 1; - id.hi = 1; - - TMiniLoadEtlStatusResult res; - TMiniLoadEtlTaskRequest req; - req.params.params.fragment_instance_id = id; - - // make it running - EXPECT_TRUE(mgr.start_job(req).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::RUNNING, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); - - // Put it twice - EXPECT_TRUE(mgr.start_job(req).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::RUNNING, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); -} - -TEST_F(EtlJobMgrTest, RunAfterSuccess) { - EtlJobMgr mgr(&_exec_env); - TUniqueId id; - id.lo = 1; - id.hi = 1; - - TMiniLoadEtlStatusResult res; - TMiniLoadEtlTaskRequest req; - TDeleteEtlFilesRequest del_req; - del_req.mini_load_id = id; - req.params.params.fragment_instance_id = id; - - // make it running - EXPECT_TRUE(mgr.start_job(req).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::RUNNING, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); - - // make it finishing - EtlJobResult job_result; - job_result.file_map["abc"] = 100L; - EXPECT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::FINISHED, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); - EXPECT_EQ(1, res.file_map.size()); - EXPECT_EQ(100, res.file_map["abc"]); - - // Put it twice - EXPECT_TRUE(mgr.start_job(req).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::FINISHED, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); - EXPECT_EQ(1, res.file_map.size()); - EXPECT_EQ(100, res.file_map["abc"]); -} - -TEST_F(EtlJobMgrTest, RunAfterFail) { - EtlJobMgr mgr(&_exec_env); - TUniqueId id; - id.lo = 1; - id.hi = 1; - - TMiniLoadEtlStatusResult res; - TMiniLoadEtlTaskRequest req; - req.params.params.fragment_instance_id = id; - - // make it running - EXPECT_TRUE(mgr.start_job(req).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::RUNNING, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); - - // make it finishing - EtlJobResult job_result; - job_result.debug_path = "abc"; - EXPECT_TRUE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc error"), job_result).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::CANCELLED, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); - - // Put it twice - EXPECT_TRUE(mgr.start_job(req).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::RUNNING, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); -} - -TEST_F(EtlJobMgrTest, CancelJob) { - EtlJobMgr mgr(&_exec_env); - TUniqueId id; - id.lo = 1; - id.hi = 1; - - TMiniLoadEtlStatusResult res; - TMiniLoadEtlTaskRequest req; - req.params.params.fragment_instance_id = id; - - // make it running - EXPECT_TRUE(mgr.start_job(req).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::RUNNING, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); - - // make it finishing - EtlJobResult job_result; - job_result.debug_path = "abc"; - EXPECT_TRUE(mgr.cancel_job(id).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::CANCELLED, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); - - // Put it twice - EXPECT_TRUE(mgr.start_job(req).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::RUNNING, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); -} - -TEST_F(EtlJobMgrTest, FinishUnknownJob) { - EtlJobMgr mgr(&_exec_env); - TUniqueId id; - id.lo = 1; - id.hi = 1; - - TMiniLoadEtlStatusResult res; - - // make it finishing - EtlJobResult job_result; - job_result.debug_path = "abc"; - EXPECT_FALSE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc error"), job_result).ok()); - EXPECT_TRUE(mgr.get_job_state(id, &res).ok()); - EXPECT_EQ(TEtlState::CANCELLED, res.etl_state); - EXPECT_EQ(TStatusCode::OK, res.status.status_code); -} - -} // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index f6b85a0ebe..7278dd53aa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -29,11 +29,9 @@ import org.apache.doris.analysis.FunctionName; import org.apache.doris.analysis.FunctionParams; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.IsNullPredicate; -import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.PartitionNames; -import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StorageBackend; @@ -45,7 +43,6 @@ import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.FunctionSet; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.MaterializedIndex.IndexExtState; @@ -88,14 +85,12 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.persist.ReplicaPersistInfo; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Backend; -import org.apache.doris.task.AgentClient; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.task.PushTask; import org.apache.doris.thrift.TBrokerScanRangeParams; import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TFileFormatType; -import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPriority; import org.apache.doris.transaction.TransactionNotFoundException; @@ -230,140 +225,6 @@ public class Load { lock.writeLock().unlock(); } - // return true if we truly add the load job - // return false otherwise (eg: a retry request) - @Deprecated - public boolean addMiniLoadJob(TMiniLoadRequest request) throws DdlException { - // get params - String fullDbName = request.getDb(); - String tableName = request.getTbl(); - String label = request.getLabel(); - long timestamp = 0; - if (request.isSetTimestamp()) { - timestamp = request.getTimestamp(); - } - TNetworkAddress beAddr = request.getBackend(); - String filePathsValue = request.getFiles().get(0); - Map params = request.getProperties(); - - // create load stmt - // label name - LabelName labelName = new LabelName(fullDbName, label); - - // data descriptions - // file paths - if (Strings.isNullOrEmpty(filePathsValue)) { - throw new DdlException("File paths are not specified"); - } - List filePaths = Arrays.asList(filePathsValue.split(",")); - - // partitions | column names | separator | line delimiter - List partitionNames = null; - List columnNames = null; - Separator columnSeparator = null; - List hllColumnPairList = null; - Separator lineDelimiter = null; - String formatType = null; - if (params != null) { - String specifiedPartitions = params.get(LoadStmt.KEY_IN_PARAM_PARTITIONS); - if (!Strings.isNullOrEmpty(specifiedPartitions)) { - partitionNames = Arrays.asList(specifiedPartitions.split(",")); - } - String specifiedColumns = params.get(LoadStmt.KEY_IN_PARAM_COLUMNS); - if (!Strings.isNullOrEmpty(specifiedColumns)) { - columnNames = Arrays.asList(specifiedColumns.split(",")); - } - - final String hll = params.get(LoadStmt.KEY_IN_PARAM_HLL); - if (!Strings.isNullOrEmpty(hll)) { - hllColumnPairList = Arrays.asList(hll.split(":")); - } - - String columnSeparatorStr = params.get(LoadStmt.KEY_IN_PARAM_COLUMN_SEPARATOR); - if (columnSeparatorStr != null) { - if (columnSeparatorStr.isEmpty()) { - columnSeparatorStr = "\t"; - } - columnSeparator = new Separator(columnSeparatorStr); - try { - columnSeparator.analyze(); - } catch (AnalysisException e) { - throw new DdlException(e.getMessage()); - } - } - String lineDelimiterStr = params.get(LoadStmt.KEY_IN_PARAM_LINE_DELIMITER); - if (lineDelimiterStr != null) { - if (lineDelimiterStr.isEmpty()) { - lineDelimiterStr = "\n"; - } - lineDelimiter = new Separator(lineDelimiterStr); - try { - lineDelimiter.analyze(); - } catch (AnalysisException e) { - throw new DdlException(e.getMessage()); - } - } - formatType = params.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE); - } - - DataDescription dataDescription = new DataDescription( - tableName, - partitionNames != null ? new PartitionNames(false, partitionNames) : null, - filePaths, - columnNames, - columnSeparator, - formatType, - false, - null - ); - dataDescription.setLineDelimiter(lineDelimiter); - dataDescription.setBeAddr(beAddr); - // parse hll param pair - if (hllColumnPairList != null) { - for (int i = 0; i < hllColumnPairList.size(); i++) { - final String pairStr = hllColumnPairList.get(i); - final List pairList = Arrays.asList(pairStr.split(",")); - if (pairList.size() != 2) { - throw new DdlException("hll param format error"); - } - - final String resultColumn = pairList.get(0); - final String hashColumn = pairList.get(1); - final Pair> pair = new Pair>(FunctionSet.HLL_HASH, - Arrays.asList(hashColumn)); - dataDescription.addColumnMapping(resultColumn, pair); - } - } - - List dataDescriptions = Lists.newArrayList(dataDescription); - - // job properties - Map properties = Maps.newHashMap(); - if (params != null) { - String maxFilterRatio = params.get(LoadStmt.MAX_FILTER_RATIO_PROPERTY); - if (!Strings.isNullOrEmpty(maxFilterRatio)) { - properties.put(LoadStmt.MAX_FILTER_RATIO_PROPERTY, maxFilterRatio); - } - String timeout = params.get(LoadStmt.TIMEOUT_PROPERTY); - if (!Strings.isNullOrEmpty(timeout)) { - properties.put(LoadStmt.TIMEOUT_PROPERTY, timeout); - } - } - LoadStmt stmt = new LoadStmt(labelName, dataDescriptions, null, null, properties); - - // try to register mini label - if (!registerMiniLabel(fullDbName, label, timestamp)) { - return false; - } - - try { - addLoadJob(stmt, EtlJobType.MINI, timestamp); - return true; - } finally { - deregisterMiniLabel(fullDbName, label); - } - } - public void addLoadJob(LoadStmt stmt, EtlJobType etlJobType, long timestamp) throws DdlException { // get db String dbName = stmt.getLabel().getDbName(); @@ -2641,24 +2502,6 @@ public class Load { } break; case MINI: - for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) { - long backendId = taskInfo.getBackendId(); - Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendId); - if (backend == null) { - LOG.warn("backend does not exist. id: {}", backendId); - break; - } - - long dbId = job.getDbId(); - Database db = Catalog.getCurrentInternalCatalog().getDbNullable(dbId); - if (db == null) { - LOG.warn("db does not exist. id: {}", dbId); - break; - } - - AgentClient client = new AgentClient(backend.getHost(), backend.getBePort()); - client.deleteEtlFiles(dbId, job.getId(), db.getFullName(), job.getLabel()); - } break; case INSERT: break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 17b7af2508..a1b2edefdb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -22,14 +22,12 @@ import org.apache.doris.analysis.CompoundPredicate.Operator; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.Table; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.Config; import org.apache.doris.common.DataQualityException; import org.apache.doris.common.DdlException; -import org.apache.doris.common.DuplicatedRequestException; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.PatternMatcher; @@ -41,9 +39,6 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.load.FailMsg.CancelType; import org.apache.doris.load.Load; -import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.TMiniLoadBeginRequest; -import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.TransactionState; @@ -137,53 +132,6 @@ public class LoadManager implements Writable { .filter(j -> (j.getState() != JobState.FINISHED && j.getState() != JobState.CANCELLED)).count(); } - /** - * This method will be invoked by streaming mini load. - * It will begin the txn of mini load immediately without any scheduler . - * - */ - public long createLoadJobFromMiniLoad(TMiniLoadBeginRequest request) throws UserException { - String cluster = SystemInfoService.DEFAULT_CLUSTER; - if (request.isSetCluster()) { - cluster = request.getCluster(); - } - Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb())); - Table table = database.getTableOrDdlException(request.tbl); - MiniLoadJob loadJob = null; - writeLock(); - try { - loadJob = new MiniLoadJob(database.getId(), table.getId(), request); - // call unprotectedExecute before adding load job. so that if job is not started ok, no need to add. - // NOTICE(cmy): this order is only for Mini Load, because mini load's - // unprotectedExecute() only do beginTxn(). - // for other kind of load job, execute the job after adding job. - // Mini load job must be executed before release write lock. - // Otherwise, the duplicated request maybe get the transaction id before transaction of mini load is begun. - loadJob.beginTxn(); - loadJob.unprotectedExecute(); - createLoadJob(loadJob); - } catch (DuplicatedRequestException e) { - // this is a duplicate request, just return previous txn id - LOG.info("duplicate request for mini load. request id: {}, txn: {}", e.getDuplicatedRequestId(), - e.getTxnId()); - return e.getTxnId(); - } catch (UserException e) { - if (loadJob != null) { - loadJob.cancelJobWithoutCheck(new FailMsg(CancelType.LOAD_RUN_FAIL, e.getMessage()), false, - false /* no need to write edit log, because createLoadJob log is not wrote yet */); - } - throw e; - } finally { - writeUnlock(); - } - - // The persistence of mini load must be the final step of create mini load. - // After mini load was executed, the txn id has been set and state has been changed to loading. - // Those two need to be record in persistence. - Catalog.getCurrentCatalog().getEditLog().logCreateLoadJob(loadJob); - return loadJob.getTransactionId(); - } - /** * This method will be invoked by version1 of broker or hadoop load. * It is used to check the label of v1 and v2 at the same time. @@ -209,31 +157,6 @@ public class LoadManager implements Writable { } } - /** - * This method will be invoked by non-streaming of mini load. - * It is used to check the label of v1 and v2 at the same time. - * Finally, the non-streaming mini load will belongs to load class. - * - * @param request request - * @return if: mini load is a duplicated load, return false. else: return true. - * @deprecated not support mini load - */ - @Deprecated - public boolean createLoadJobV1FromRequest(TMiniLoadRequest request) throws DdlException { - String cluster = SystemInfoService.DEFAULT_CLUSTER; - if (request.isSetCluster()) { - cluster = request.getCluster(); - } - Database database = checkDb(ClusterNamespace.getFullName(cluster, request.getDb())); - writeLock(); - try { - checkLabelUsed(database.getId(), request.getLabel()); - return Catalog.getCurrentCatalog().getLoadInstance().addMiniLoadJob(request); - } finally { - writeUnlock(); - } - } - /** * MultiLoadMgr use. **/ diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java index 4c59ffdaea..2da78ae1c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadJob.java @@ -20,21 +20,11 @@ package org.apache.doris.load.loadv2; import org.apache.doris.catalog.AuthorizationInfo; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.DuplicatedRequestException; -import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.QuotaExceedException; import org.apache.doris.common.io.Text; import org.apache.doris.load.EtlJobType; -import org.apache.doris.service.FrontendOptions; -import org.apache.doris.thrift.TMiniLoadBeginRequest; -import org.apache.doris.transaction.BeginTransactionException; import org.apache.doris.transaction.TransactionState; -import org.apache.doris.transaction.TransactionState.TxnCoordinator; -import org.apache.doris.transaction.TransactionState.TxnSourceType; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -55,22 +45,6 @@ public class MiniLoadJob extends LoadJob { super(EtlJobType.MINI); } - public MiniLoadJob(long dbId, long tableId, TMiniLoadBeginRequest request) throws MetaNotFoundException { - super(EtlJobType.MINI, dbId, request.getLabel()); - this.tableId = tableId; - this.tableName = request.getTbl(); - if (request.isSetTimeoutSecond()) { - setTimeout(request.getTimeoutSecond()); - } - if (request.isSetMaxFilterRatio()) { - setMaxFilterRatio(request.getMaxFilterRatio()); - } - this.createTimestamp = request.getCreateTimestamp(); - this.loadStartTimestamp = createTimestamp; - this.authorizationInfo = gatherAuthInfo(); - this.requestId = request.getRequestId(); - } - @Override public Set getTableNamesForShow() { return Sets.newHashSet(tableName); @@ -87,15 +61,7 @@ public class MiniLoadJob extends LoadJob { } @Override - public void beginTxn() - throws LabelAlreadyUsedException, BeginTransactionException, AnalysisException, DuplicatedRequestException, - QuotaExceedException, MetaNotFoundException { - transactionId = Catalog.getCurrentGlobalTransactionMgr() - .beginTransaction(dbId, Lists.newArrayList(tableId), label, requestId, - new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), - TransactionState.LoadJobSourceType.BACKEND_STREAMING, id, - getTimeout()); - } + public void beginTxn() {} @Override protected void replayTxnAttachment(TransactionState txnState) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java index de66082e1c..50094c91e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MiniLoadTxnCommitAttachment.java @@ -18,7 +18,6 @@ package org.apache.doris.load.loadv2; import org.apache.doris.common.io.Text; -import org.apache.doris.thrift.TMiniLoadTxnCommitAttachment; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TxnCommitAttachment; @@ -36,15 +35,6 @@ public class MiniLoadTxnCommitAttachment extends TxnCommitAttachment { super(TransactionState.LoadJobSourceType.BACKEND_STREAMING); } - public MiniLoadTxnCommitAttachment(TMiniLoadTxnCommitAttachment tMiniLoadTxnCommitAttachment) { - super(TransactionState.LoadJobSourceType.BACKEND_STREAMING); - this.loadedRows = tMiniLoadTxnCommitAttachment.getLoadedRows(); - this.filteredRows = tMiniLoadTxnCommitAttachment.getFilteredRows(); - if (tMiniLoadTxnCommitAttachment.isSetErrorLogUrl()) { - this.errorLogUrl = tMiniLoadTxnCommitAttachment.getErrorLogUrl(); - } - } - public long getLoadedRows() { return loadedRows; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index da945704eb..d837c3e1f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -43,15 +43,12 @@ import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; import org.apache.doris.system.SystemInfoService; -import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.google.common.collect.Streams; -import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.awaitility.Awaitility; @@ -64,7 +61,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; // Class used to record state of multi-load operation public class MultiLoadMgr { @@ -107,18 +103,6 @@ public class MultiLoadMgr { Catalog.getCurrentCatalog().getLoadManager().createLoadJobV1FromMultiStart(fullDbName, label); } - public void load(TMiniLoadRequest request) throws DdlException { - if (CollectionUtils.isNotEmpty(request.getFileSize()) - && request.getFileSize().size() != request.getFiles().size()) { - throw new DdlException("files count and file size count not match: [" + request.getFileSize().size() - + "!=" + request.getFiles().size() + "]"); - } - List> files = Streams.zip(request.getFiles().stream(), - request.getFileSize().stream(), Pair::create).collect(Collectors.toList()); - load(request.getDb(), request.getLabel(), request.getSubLabel(), request.getTbl(), - files, request.getBackend(), request.getProperties(), request.getTimestamp()); - } - // Add one load job private void load(String fullDbName, String label, String subLabel, String table, diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index fc2bc0785c..31e44cd711 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -43,16 +43,10 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.Version; import org.apache.doris.datasource.DataSourceIf; import org.apache.doris.datasource.InternalDataSource; -import org.apache.doris.load.EtlStatus; -import org.apache.doris.load.LoadJob; -import org.apache.doris.load.MiniEtlTaskInfo; import org.apache.doris.master.MasterImpl; import org.apache.doris.metric.MetricRepo; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.StreamLoadPlanner; -import org.apache.doris.plugin.AuditEvent; -import org.apache.doris.plugin.AuditEvent.AuditEventBuilder; -import org.apache.doris.plugin.AuditEvent.EventType; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ConnectProcessor; import org.apache.doris.qe.QeProcessorImpl; @@ -77,10 +71,8 @@ import org.apache.doris.thrift.TGetDbsParams; import org.apache.doris.thrift.TGetDbsResult; import org.apache.doris.thrift.TGetTablesParams; import org.apache.doris.thrift.TGetTablesResult; -import org.apache.doris.thrift.TIsMethodSupportedRequest; import org.apache.doris.thrift.TListPrivilegesResult; import org.apache.doris.thrift.TListTableStatusResult; -import org.apache.doris.thrift.TLoadCheckRequest; import org.apache.doris.thrift.TLoadTxn2PCRequest; import org.apache.doris.thrift.TLoadTxn2PCResult; import org.apache.doris.thrift.TLoadTxnBeginRequest; @@ -92,10 +84,6 @@ import org.apache.doris.thrift.TLoadTxnRollbackResult; import org.apache.doris.thrift.TMasterOpRequest; import org.apache.doris.thrift.TMasterOpResult; import org.apache.doris.thrift.TMasterResult; -import org.apache.doris.thrift.TMiniLoadBeginRequest; -import org.apache.doris.thrift.TMiniLoadBeginResult; -import org.apache.doris.thrift.TMiniLoadEtlStatusResult; -import org.apache.doris.thrift.TMiniLoadRequest; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPrivilegeStatus; import org.apache.doris.thrift.TReportExecStatusParams; @@ -109,9 +97,7 @@ import org.apache.doris.thrift.TStatusCode; import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TStreamLoadPutResult; import org.apache.doris.thrift.TTableStatus; -import org.apache.doris.thrift.TUniqueId; import org.apache.doris.thrift.TUpdateExportTaskStatusRequest; -import org.apache.doris.thrift.TUpdateMiniEtlTaskStatusRequest; import org.apache.doris.thrift.TWaitingTxnStatusRequest; import org.apache.doris.thrift.TWaitingTxnStatusResult; import org.apache.doris.transaction.DatabaseTransactionMgr; @@ -121,7 +107,6 @@ import org.apache.doris.transaction.TransactionState.TxnCoordinator; import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TxnCommitAttachment; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -130,8 +115,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.thrift.TException; -import java.net.InetAddress; -import java.net.UnknownHostException; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -479,197 +462,6 @@ public class FrontendServiceImpl implements FrontendService.Iface { return masterImpl.fetchResource(); } - @Deprecated - @Override - public TFeResult miniLoad(TMiniLoadRequest request) throws TException { - LOG.debug("receive mini load request: label: {}, db: {}, tbl: {}, backend: {}", - request.getLabel(), request.getDb(), request.getTbl(), request.getBackend()); - - ConnectContext context = new ConnectContext(null); - String cluster = SystemInfoService.DEFAULT_CLUSTER; - if (request.isSetCluster()) { - cluster = request.cluster; - } - - final String fullDbName = ClusterNamespace.getFullName(cluster, request.db); - request.setDb(fullDbName); - context.setCluster(cluster); - context.setDatabase(ClusterNamespace.getFullName(cluster, request.db)); - context.setQualifiedUser(ClusterNamespace.getFullName(cluster, request.user)); - context.setCatalog(Catalog.getCurrentCatalog()); - context.getState().reset(); - context.setThreadLocalInfo(); - - TStatus status = new TStatus(TStatusCode.OK); - TFeResult result = new TFeResult(FrontendServiceVersion.V1, status); - try { - if (request.isSetSubLabel()) { - ExecuteEnv.getInstance().getMultiLoadMgr().load(request); - } else { - // try to add load job, label will be checked here. - if (Catalog.getCurrentCatalog().getLoadManager().createLoadJobV1FromRequest(request)) { - try { - // generate mini load audit log - logMiniLoadStmt(request); - } catch (Exception e) { - LOG.warn("failed log mini load stmt", e); - } - } - } - } catch (UserException e) { - LOG.warn("add mini load error: {}", e.getMessage()); - status.setStatusCode(TStatusCode.ANALYSIS_ERROR); - status.addToErrorMsgs(e.getMessage()); - } catch (Throwable e) { - LOG.warn("unexpected exception when adding mini load", e); - status.setStatusCode(TStatusCode.ANALYSIS_ERROR); - status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); - } finally { - ConnectContext.remove(); - } - - LOG.debug("mini load result: {}", result); - return result; - } - - private void logMiniLoadStmt(TMiniLoadRequest request) throws UnknownHostException { - String stmt = getMiniLoadStmt(request); - AuditEvent auditEvent = new AuditEventBuilder().setEventType(EventType.AFTER_QUERY) - .setClientIp(request.user_ip + ":0") - .setUser(request.user) - .setDb(request.db) - .setState(TStatusCode.OK.name()) - .setQueryTime(0) - .setStmt(stmt).build(); - - Catalog.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent); - } - - private String getMiniLoadStmt(TMiniLoadRequest request) throws UnknownHostException { - StringBuilder stringBuilder = new StringBuilder(); - stringBuilder.append("curl --location-trusted -u user:passwd -T "); - - if (request.files.size() == 1) { - stringBuilder.append(request.files.get(0)); - } else if (request.files.size() > 1) { - stringBuilder.append("\"{").append(Joiner.on(",").join(request.files)).append("}\""); - } - - InetAddress masterAddress = FrontendOptions.getLocalHost(); - stringBuilder.append(" http://").append(masterAddress.getHostAddress()).append(":"); - stringBuilder.append(Config.http_port).append("/api/").append(request.db).append("/"); - stringBuilder.append(request.tbl).append("/_load?label=").append(request.label); - - if (!request.properties.isEmpty()) { - stringBuilder.append("&"); - List props = Lists.newArrayList(); - for (Map.Entry entry : request.properties.entrySet()) { - String prop = entry.getKey() + "=" + entry.getValue(); - props.add(prop); - } - stringBuilder.append(Joiner.on("&").join(props)); - } - - return stringBuilder.toString(); - } - - @Override - public TFeResult updateMiniEtlTaskStatus(TUpdateMiniEtlTaskStatusRequest request) throws TException { - TFeResult result = new TFeResult(); - result.setProtocolVersion(FrontendServiceVersion.V1); - TStatus status = new TStatus(TStatusCode.OK); - result.setStatus(status); - - // get job task info - TUniqueId etlTaskId = request.getEtlTaskId(); - long jobId = etlTaskId.getHi(); - long taskId = etlTaskId.getLo(); - LoadJob job = Catalog.getCurrentCatalog().getLoadInstance().getLoadJob(jobId); - if (job == null) { - String failMsg = "job does not exist. id: " + jobId; - LOG.warn(failMsg); - status.setStatusCode(TStatusCode.CANCELLED); - status.addToErrorMsgs(failMsg); - return result; - } - - MiniEtlTaskInfo taskInfo = job.getMiniEtlTask(taskId); - if (taskInfo == null) { - String failMsg = "task info does not exist. task id: " + taskId + ", job id: " + jobId; - LOG.warn(failMsg); - status.setStatusCode(TStatusCode.CANCELLED); - status.addToErrorMsgs(failMsg); - return result; - } - - // update etl task status - TMiniLoadEtlStatusResult statusResult = request.getEtlTaskStatus(); - LOG.debug("load job id: {}, etl task id: {}, status: {}", jobId, taskId, statusResult); - EtlStatus taskStatus = taskInfo.getTaskStatus(); - if (taskStatus.setState(statusResult.getEtlState())) { - if (statusResult.isSetCounters()) { - taskStatus.setCounters(statusResult.getCounters()); - } - if (statusResult.isSetTrackingUrl()) { - taskStatus.setTrackingUrl(statusResult.getTrackingUrl()); - } - if (statusResult.isSetFileMap()) { - taskStatus.setFileMap(statusResult.getFileMap()); - } - } - return result; - } - - @Override - public TMiniLoadBeginResult miniLoadBegin(TMiniLoadBeginRequest request) throws TException { - LOG.debug("receive mini load begin request. label: {}, user: {}, ip: {}", - request.getLabel(), request.getUser(), request.getUserIp()); - - TMiniLoadBeginResult result = new TMiniLoadBeginResult(); - TStatus status = new TStatus(TStatusCode.OK); - result.setStatus(status); - try { - String cluster = SystemInfoService.DEFAULT_CLUSTER; - if (request.isSetCluster()) { - cluster = request.cluster; - } - // step1: check password and privs - checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), - request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); - // step2: check label and record metadata in load manager - if (request.isSetSubLabel()) { - // TODO(ml): multi mini load - } else { - // add load metadata in loadManager - result.setTxnId(Catalog.getCurrentCatalog().getLoadManager().createLoadJobFromMiniLoad(request)); - } - return result; - } catch (UserException e) { - status.setStatusCode(TStatusCode.ANALYSIS_ERROR); - status.addToErrorMsgs(e.getMessage()); - return result; - } catch (Throwable e) { - LOG.warn("catch unknown result.", e); - status.setStatusCode(TStatusCode.INTERNAL_ERROR); - status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); - return result; - } - } - - @Override - public TFeResult isMethodSupported(TIsMethodSupportedRequest request) throws TException { - TStatus status = new TStatus(TStatusCode.OK); - TFeResult result = new TFeResult(FrontendServiceVersion.V1, status); - switch (request.getFunctionName()) { - case "STREAMING_MINI_LOAD": - break; - default: - status.setStatusCode(TStatusCode.NOT_IMPLEMENTED_ERROR); - break; - } - return result; - } - @Override public TMasterOpResult forward(TMasterOpRequest params) throws TException { TNetworkAddress clientAddr = getClientAddr(); @@ -723,35 +515,6 @@ public class FrontendServiceImpl implements FrontendService.Iface { } } - @Override - public TFeResult loadCheck(TLoadCheckRequest request) throws TException { - LOG.debug("receive load check request. label: {}, user: {}, ip: {}", - request.getLabel(), request.getUser(), request.getUserIp()); - - TStatus status = new TStatus(TStatusCode.OK); - TFeResult result = new TFeResult(FrontendServiceVersion.V1, status); - try { - String cluster = SystemInfoService.DEFAULT_CLUSTER; - if (request.isSetCluster()) { - cluster = request.cluster; - } - - checkPasswordAndPrivs(cluster, request.getUser(), request.getPasswd(), request.getDb(), - request.getTbl(), request.getUserIp(), PrivPredicate.LOAD); - } catch (UserException e) { - status.setStatusCode(TStatusCode.ANALYSIS_ERROR); - status.addToErrorMsgs(e.getMessage()); - return result; - } catch (Throwable e) { - LOG.warn("catch unknown result.", e); - status.setStatusCode(TStatusCode.INTERNAL_ERROR); - status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage())); - return result; - } - - return result; - } - @Override public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException { String clientAddr = getClientAddrAsString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java index 4eb52dea1d..f90aa67358 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentClient.java @@ -21,14 +21,9 @@ import org.apache.doris.common.ClientPool; import org.apache.doris.common.Status; import org.apache.doris.thrift.BackendService; import org.apache.doris.thrift.TAgentResult; -import org.apache.doris.thrift.TAgentServiceVersion; import org.apache.doris.thrift.TCheckStorageFormatResult; -import org.apache.doris.thrift.TDeleteEtlFilesRequest; import org.apache.doris.thrift.TExportStatusResult; import org.apache.doris.thrift.TExportTaskRequest; -import org.apache.doris.thrift.TMiniLoadEtlStatusRequest; -import org.apache.doris.thrift.TMiniLoadEtlStatusResult; -import org.apache.doris.thrift.TMiniLoadEtlTaskRequest; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TSnapshotRequest; import org.apache.doris.thrift.TStatus; @@ -52,22 +47,6 @@ public class AgentClient { this.port = port; } - public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) { - TAgentResult result = null; - LOG.debug("submit etl task. request: {}", request); - try { - borrowClient(); - // submit etl task - result = client.submitEtlTask(request); - ok = true; - } catch (Exception e) { - LOG.warn("submit etl task error", e); - } finally { - returnClient(); - } - return result; - } - public TAgentResult makeSnapshot(TSnapshotRequest request) { TAgentResult result = null; LOG.debug("submit make snapshot task. request: {}", request); @@ -116,24 +95,6 @@ public class AgentClient { return result; } - public TMiniLoadEtlStatusResult getEtlStatus(long jobId, long taskId) { - TMiniLoadEtlStatusResult result = null; - TMiniLoadEtlStatusRequest request = new TMiniLoadEtlStatusRequest(TAgentServiceVersion.V1, - new TUniqueId(jobId, taskId)); - LOG.debug("get mini load etl task status. request: {}", request); - try { - borrowClient(); - // get etl status - result = client.getEtlStatus(request); - ok = true; - } catch (Exception e) { - LOG.warn("get etl status error", e); - } finally { - returnClient(); - } - return result; - } - public TExportStatusResult getExportStatus(long jobId, long taskId) { TExportStatusResult result = null; TUniqueId request = new TUniqueId(jobId, taskId); @@ -183,22 +144,6 @@ public class AgentClient { return result; } - public void deleteEtlFiles(long dbId, long jobId, String dbName, String label) { - TDeleteEtlFilesRequest request = new TDeleteEtlFilesRequest(TAgentServiceVersion.V1, - new TUniqueId(dbId, jobId), dbName, label); - LOG.debug("delete etl files. request: {}", request); - try { - borrowClient(); - // delete etl files - client.deleteEtlFiles(request); - ok = true; - } catch (Exception e) { - LOG.warn("delete etl files error", e); - } finally { - returnClient(); - } - } - private void borrowClient() throws Exception { // create agent client ok = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java index c167c1a662..4c7d27de5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TxnCommitAttachment.java @@ -47,8 +47,6 @@ public abstract class TxnCommitAttachment implements Writable { switch (txnCommitAttachment.getLoadType()) { case ROUTINE_LOAD: return new RLTaskTxnCommitAttachment(txnCommitAttachment.getRlTaskTxnCommitAttachment()); - case MINI_LOAD: - return new MiniLoadTxnCommitAttachment(txnCommitAttachment.getMlTxnCommitAttachment()); default: return null; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java index 0fa7161a5e..09793577a1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/GenericPoolTest.java @@ -25,7 +25,6 @@ import org.apache.doris.thrift.TAgentTaskRequest; import org.apache.doris.thrift.TCancelPlanFragmentParams; import org.apache.doris.thrift.TCancelPlanFragmentResult; import org.apache.doris.thrift.TCheckStorageFormatResult; -import org.apache.doris.thrift.TDeleteEtlFilesRequest; import org.apache.doris.thrift.TDiskTrashInfo; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentResult; @@ -33,9 +32,6 @@ import org.apache.doris.thrift.TExportStatusResult; import org.apache.doris.thrift.TExportTaskRequest; import org.apache.doris.thrift.TFetchDataParams; import org.apache.doris.thrift.TFetchDataResult; -import org.apache.doris.thrift.TMiniLoadEtlStatusRequest; -import org.apache.doris.thrift.TMiniLoadEtlStatusResult; -import org.apache.doris.thrift.TMiniLoadEtlTaskRequest; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TResultBatch; import org.apache.doris.thrift.TRoutineLoadTask; @@ -156,21 +152,6 @@ public class GenericPoolTest { return null; } - @Override - public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) throws TException { - return null; - } - - @Override - public TMiniLoadEtlStatusResult getEtlStatus(TMiniLoadEtlStatusRequest request) throws TException { - return null; - } - - @Override - public TAgentResult deleteEtlFiles(TDeleteEtlFilesRequest request) throws TException { - return null; - } - @Override public TAgentResult makeSnapshot(TSnapshotRequest snapshotRequest) throws TException { // TODO Auto-generated method stub diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java index 7c9c513572..e49503ca04 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedBackendFactory.java @@ -34,9 +34,7 @@ import org.apache.doris.thrift.TCancelPlanFragmentParams; import org.apache.doris.thrift.TCancelPlanFragmentResult; import org.apache.doris.thrift.TCheckStorageFormatResult; import org.apache.doris.thrift.TCloneReq; -import org.apache.doris.thrift.TDeleteEtlFilesRequest; import org.apache.doris.thrift.TDiskTrashInfo; -import org.apache.doris.thrift.TEtlState; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentResult; import org.apache.doris.thrift.TExportState; @@ -47,9 +45,6 @@ import org.apache.doris.thrift.TFetchDataResult; import org.apache.doris.thrift.TFinishTaskRequest; import org.apache.doris.thrift.THeartbeatResult; import org.apache.doris.thrift.TMasterInfo; -import org.apache.doris.thrift.TMiniLoadEtlStatusRequest; -import org.apache.doris.thrift.TMiniLoadEtlStatusResult; -import org.apache.doris.thrift.TMiniLoadEtlTaskRequest; import org.apache.doris.thrift.TRoutineLoadTask; import org.apache.doris.thrift.TScanBatchResult; import org.apache.doris.thrift.TScanCloseParams; @@ -243,21 +238,6 @@ public class MockedBackendFactory { return new TAgentResult(new TStatus(TStatusCode.OK)); } - @Override - public TAgentResult submitEtlTask(TMiniLoadEtlTaskRequest request) throws TException { - return new TAgentResult(new TStatus(TStatusCode.OK)); - } - - @Override - public TMiniLoadEtlStatusResult getEtlStatus(TMiniLoadEtlStatusRequest request) throws TException { - return new TMiniLoadEtlStatusResult(new TStatus(TStatusCode.OK), TEtlState.FINISHED); - } - - @Override - public TAgentResult deleteEtlFiles(TDeleteEtlFilesRequest request) throws TException { - return new TAgentResult(new TStatus(TStatusCode.OK)); - } - @Override public TStatus submitExportTask(TExportTaskRequest request) throws TException { return new TStatus(TStatusCode.OK); diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 740db1fd08..83f68834f8 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -409,29 +409,3 @@ struct TAgentPublishRequest { 2: required list updates } -struct TMiniLoadEtlTaskRequest { - 1: required TAgentServiceVersion protocol_version - 2: required PaloInternalService.TExecPlanFragmentParams params -} - -struct TMiniLoadEtlStatusRequest { - 1: required TAgentServiceVersion protocol_version - 2: required Types.TUniqueId mini_load_id -} - -struct TMiniLoadEtlStatusResult { - 1: required Status.TStatus status - 2: required Types.TEtlState etl_state - 3: optional map file_map - 4: optional map counters - 5: optional string tracking_url - // progress -} - -struct TDeleteEtlFilesRequest { - 1: required TAgentServiceVersion protocol_version - 2: required Types.TUniqueId mini_load_id - 3: required string db_name - 4: required string label -} - diff --git a/gensrc/thrift/BackendService.thrift b/gensrc/thrift/BackendService.thrift index 8e534f4677..66a59260fc 100644 --- a/gensrc/thrift/BackendService.thrift +++ b/gensrc/thrift/BackendService.thrift @@ -146,13 +146,6 @@ service BackendService { AgentService.TAgentResult publish_cluster_state(1:AgentService.TAgentPublishRequest request); - AgentService.TAgentResult submit_etl_task(1:AgentService.TMiniLoadEtlTaskRequest request); - - AgentService.TMiniLoadEtlStatusResult get_etl_status( - 1:AgentService.TMiniLoadEtlStatusRequest request); - - AgentService.TAgentResult delete_etl_files(1:AgentService.TDeleteEtlFilesRequest request); - Status.TStatus submit_export_task(1:TExportTaskRequest request); PaloInternalService.TExportStatusResult get_export_status(1:Types.TUniqueId task_id); diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index b2ef1a1143..ff59b203ba 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -411,31 +411,6 @@ struct TFeResult { 2: required Status.TStatus status } -// Submit one table load job -// if subLabel is set, this job belong to a multi-load transaction -struct TMiniLoadRequest { - 1: required FrontendServiceVersion protocolVersion - 2: required string db - 3: required string tbl - 4: required string label - 5: optional string user - 6: required Types.TNetworkAddress backend - 7: required list files - 8: required map properties - 9: optional string subLabel - 10: optional string cluster - 11: optional i64 timestamp - 12: optional string user_ip - 13: optional bool is_retry - 14: optional list file_size -} - -struct TUpdateMiniEtlTaskStatusRequest { - 1: required FrontendServiceVersion protocolVersion - 2: required Types.TUniqueId etlTaskId - 3: required AgentService.TMiniLoadEtlStatusResult etlTaskStatus -} - struct TMasterOpRequest { 1: required string user 2: required string db @@ -483,44 +458,6 @@ struct TMasterOpResult { 4: optional Types.TUniqueId queryId; } -struct TLoadCheckRequest { - 1: required FrontendServiceVersion protocolVersion - 2: required string user - 3: required string passwd - 4: required string db - 5: optional string label - 6: optional string cluster - 7: optional i64 timestamp - 8: optional string user_ip - 9: optional string tbl -} - -struct TMiniLoadBeginRequest { - 1: required string user - 2: required string passwd - 3: optional string cluster - 4: optional string user_ip - 5: required string db - 6: required string tbl - 7: required string label - 8: optional string sub_label - 9: optional i64 timeout_second - 10: optional double max_filter_ratio - 11: optional i64 auth_code - 12: optional i64 create_timestamp - 13: optional Types.TUniqueId request_id - 14: optional string auth_code_uuid -} - -struct TIsMethodSupportedRequest { - 1: optional string function_name -} - -struct TMiniLoadBeginResult { - 1: required Status.TStatus status - 2: optional i64 txn_id -} - struct TUpdateExportTaskStatusRequest { 1: required FrontendServiceVersion protocolVersion 2: required Types.TUniqueId taskId @@ -627,16 +564,10 @@ struct TRLTaskTxnCommitAttachment { 11: optional string errorLogUrl } -struct TMiniLoadTxnCommitAttachment { - 1: required i64 loadedRows - 2: required i64 filteredRows - 3: optional string errorLogUrl -} - struct TTxnCommitAttachment { 1: required Types.TLoadType loadType 2: optional TRLTaskTxnCommitAttachment rlTaskTxnCommitAttachment - 3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment +// 3: optional TMiniLoadTxnCommitAttachment mlTxnCommitAttachment } struct TLoadTxnCommitRequest { @@ -751,14 +682,6 @@ service FrontendService { MasterService.TMasterResult finishTask(1: MasterService.TFinishTaskRequest request) MasterService.TMasterResult report(1: MasterService.TReportRequest request) MasterService.TFetchResourceResult fetchResource() - - // those three method are used for asynchronous mini load which will be abandoned - TFeResult miniLoad(1: TMiniLoadRequest request) - TFeResult updateMiniEtlTaskStatus(1: TUpdateMiniEtlTaskStatusRequest request) - TFeResult loadCheck(1: TLoadCheckRequest request) - // this method is used for streaming mini load - TMiniLoadBeginResult miniLoadBegin(1: TMiniLoadBeginRequest request) - TFeResult isMethodSupported(1: TIsMethodSupportedRequest request) TMasterOpResult forward(1: TMasterOpRequest params) diff --git a/samples/mini_load/python/mini_load_utils.py b/samples/mini_load/python/mini_load_utils.py deleted file mode 100644 index 88753680e0..0000000000 --- a/samples/mini_load/python/mini_load_utils.py +++ /dev/null @@ -1,157 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - -http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -""" - -import os -import subprocess - - -class DorisMiniLoadClient(object): - """ load file to doris """ - - def __init__(self, db_host, db_port, db_name, - db_user, db_password, file_name, table, load_timeout): - """ - init - :param db_host: db host - :param db_port: db port - :param db_name: db name - :param db_user: db user - :param db_password: db password - :param file_name: local file path - :param table: db table - :param load_timeout:mini load timeout, defalut 86400 seconds. - """ - self.file_name = file_name - self.table = table - self.load_host = db_host - self.load_port = db_port - self.load_database = db_name - self.load_user = db_user - self.load_password = db_password - self.load_timeout = load_timeout - - def get_label(self): - """ - 获取label前缀 - :return: label - """ - - return '_'.join([self.table, os.path.basename(self.file_name)]) - - def load_doris(self): - """ - load file to doris by curl, allow 3 times to retry. - :return: mini load label - """ - retry_time = 0 - label = self.get_label() - - while retry_time < 3: - load_cmd = "curl" - param_location = "--location-trusted" - param_user = "%s:%s" % (self.load_user, self.load_password) - param_file = "%s" % self.file_name - param_url = "http://%s:%s/api/%s/%s/_load?label=%s&timeout=" % (self.load_host, self.load_port, - self.load_database, - self.table, label, self.load_timeout) - - load_subprocess = subprocess.Popen([load_cmd, param_location, - "-u", param_user, "-T", param_file, param_url]) - - # Wait for child process to terminate. Returns returncode attribute - load_subprocess.wait() - - # check returncode; - # If fail, retry 3 times - if load_subprocess.returncode != 0: - print """Load to doris failed! LABEL is %s, Retry time is %d """ % (label, retry_time) - retry_time += 1 - # If success, print log, and break retry loop - if load_subprocess.returncode == 0: - print """Load to doris success! LABEL is %s, Retry time is %d """ % (label, retry_time) - break - - return label - - @classmethod - def check_load_status(cls, label, host, port, user, password, database): - """ - check async mini load process status. - :param label:mini load label - :param host: db host - :param port: db port - :param user: db user - :param password: db password - :param database: db database - :return: check async mini load process status. - """ - - db_conn = MySQLdb.connect(host=host,port=port,user=user,passwd=password,db=database) - - db_cursor = db_conn.cursor() - check_status_sql = "show load where label = '%s' order by CreateTime desc limit 1" % label - - db_cursor.execute(check_status_sql) - rows = db_cursor.fetchall() - - # timeout config: 60 minutes. - timeout = 60 * 60 - - while timeout > 0: - if len(rows) == 0: - print """Load label: %s doesn't exist""" % label - return - load_status = rows[0][2] - print "mini load status: " + load_status - if load_status == 'FINISHED': - print """Async mini load to db success! label is %s""" % label - break - if load_status == 'CANCELLED': - print """Async load to db failed! label is %s""" % label - break - timeout = timeout - 5 - time.sleep(5) - db_cursor.execute(sql) - rows = db_cursor.fetchall() - - if time_out <= 0: - print """Async load to db timeout! timeout second is: %s, label is %s""" % (time_out, label) - - -if __name__ == '__main__': - """ - mini_load demo. - There is no need to install subprocess in Python 2.7. It is a standard module that is built in. - You need input db config & load param. - """ - db_host = "db_conn_host" - db_port = "port" - db_name = "db_name" - db_user = "db_user" - db_password = "db_password" - file_name = "file_name" - table = "db_table" - # default load_time_out, seconds - load_timeout = 86400 - doris_client = DorisMiniLoadClient( - db_host, db_port, db_name, db_user, db_password, file_name, table, load_timeout) - doris_client.check_load_status(doris_client.load_doirs(), db_host, db_port, db_user, db_password, db_name) - print "load to doris end"