From c59a8cb15d6022a54b952864f6cb9b950677eb6d Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Mon, 30 Jan 2023 16:53:56 +0800 Subject: [PATCH] [refactor](remove unused code) remove log error hub (#16183) --------- Co-authored-by: yiguolei --- be/src/pipeline/pipeline_fragment_context.cpp | 3 - be/src/runtime/plan_fragment_executor.cpp | 3 - be/src/runtime/runtime_state.cpp | 32 --- be/src/runtime/runtime_state.h | 18 -- be/src/util/CMakeLists.txt | 9 - be/src/util/broker_load_error_hub.cpp | 96 ------- be/src/util/broker_load_error_hub.h | 82 ------ be/src/util/load_error_hub.cpp | 77 ------ be/src/util/load_error_hub.h | 63 ----- be/src/util/mysql_load_error_hub.cpp | 142 ---------- be/src/util/mysql_load_error_hub.h | 102 ------- be/src/util/null_load_error_hub.cpp | 48 ---- be/src/util/null_load_error_hub.h | 52 ---- .../org/apache/doris/alter/SystemHandler.java | 4 - .../analysis/AlterLoadErrorUrlClause.java | 16 +- .../java/org/apache/doris/catalog/Env.java | 57 ---- .../common/proc/LoadErrorHubProcNode.java | 47 ---- .../apache/doris/common/proc/ProcService.java | 1 - .../apache/doris/load/BrokerLoadErrorHub.java | 127 --------- .../main/java/org/apache/doris/load/Load.java | 82 ------ .../org/apache/doris/load/LoadErrorHub.java | 260 +++++++----------- .../apache/doris/load/MysqlLoadErrorHub.java | 210 -------------- .../org/apache/doris/persist/EditLog.java | 4 +- .../doris/persist/meta/MetaPersistMethod.java | 6 - .../apache/doris/persist/meta/MetaReader.java | 12 +- .../persist/meta/PersistMetaModules.java | 2 +- .../doris/planner/StreamLoadPlanner.java | 11 - .../java/org/apache/doris/qe/Coordinator.java | 11 - .../org/apache/doris/qe/ShowExecutor.java | 19 -- 29 files changed, 116 insertions(+), 1480 deletions(-) delete mode 100644 be/src/util/broker_load_error_hub.cpp delete mode 100644 be/src/util/broker_load_error_hub.h delete mode 100644 be/src/util/load_error_hub.cpp delete mode 100644 be/src/util/load_error_hub.h delete mode 100644 be/src/util/mysql_load_error_hub.cpp delete mode 100644 be/src/util/mysql_load_error_hub.h delete mode 100644 be/src/util/null_load_error_hub.cpp delete mode 100644 be/src/util/null_load_error_hub.h delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadErrorHubProcNode.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/BrokerLoadErrorHub.java delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/load/MysqlLoadErrorHub.java diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 7704628da0..17187a7c21 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -189,9 +189,6 @@ Status PipelineFragmentContext::prepare(const doris::TExecPlanFragmentParams& re if (request.__isset.load_job_id) { _runtime_state->set_load_job_id(request.load_job_id); } - if (request.__isset.load_error_hub_info) { - _runtime_state->set_load_error_hub_info(request.load_error_hub_info); - } if (request.query_options.__isset.is_report_success) { fragment_context->set_is_report_success(request.query_options.is_report_success); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 748191b177..91d9b414e4 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -114,9 +114,6 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request, if (request.__isset.load_job_id) { _runtime_state->set_load_job_id(request.load_job_id); } - if (request.__isset.load_error_hub_info) { - _runtime_state->set_load_error_hub_info(request.load_error_hub_info); - } if (request.query_options.__isset.is_report_success) { _is_report_success = request.query_options.is_report_success; diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index c2e0700678..8ab72aedd1 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -35,7 +35,6 @@ #include "runtime/memory/mem_tracker.h" #include "runtime/runtime_filter_mgr.h" #include "util/file_utils.h" -#include "util/load_error_hub.h" #include "util/pretty_printer.h" #include "util/timezone_utils.h" #include "util/uid_util.h" @@ -154,10 +153,6 @@ RuntimeState::~RuntimeState() { _error_log_file = nullptr; } - if (_error_hub != nullptr) { - _error_hub->close(); - } - _obj_pool->clear(); _runtime_filter_mgr.reset(); } @@ -342,37 +337,10 @@ Status RuntimeState::append_error_msg_to_file(std::function line, if (out.size() > 0) { (*_error_log_file) << fmt::to_string(out) << std::endl; - export_load_error(fmt::to_string(out)); } return Status::OK(); } -const int64_t HUB_MAX_ERROR_NUM = 10; - -void RuntimeState::export_load_error(const std::string& err_msg) { - if (_error_hub == nullptr) { - std::lock_guard lock(_create_error_hub_lock); - if (_error_hub == nullptr) { - if (_load_error_hub_info == nullptr) { - return; - } - - Status st = LoadErrorHub::create_hub(_exec_env, _load_error_hub_info.get(), - _error_log_file_path, &_error_hub); - if (!st.ok()) { - LOG(WARNING) << "failed to create load error hub: " << st; - return; - } - } - } - - if (_error_row_number <= HUB_MAX_ERROR_NUM) { - LoadErrorHub::ErrorMsg err(_load_job_id, err_msg); - // TODO(lingbin): think if should check return value? - _error_hub->export_error(err); - } -} - int64_t RuntimeState::get_load_mem_limit() { // TODO: the code is abandoned, it can be deleted after v1.3 if (_query_options.__isset.load_mem_limit && _query_options.load_mem_limit > 0) { diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index f9f4b8df04..7128b17f17 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -43,7 +43,6 @@ class DataStreamRecvr; class ResultBufferMgr; class TmpFileMgr; class BufferedBlockMgr; -class LoadErrorHub; class RowDescriptor; class RuntimeFilterMgr; @@ -214,18 +213,6 @@ public: const int64_t load_job_id() const { return _load_job_id; } - // we only initialize object for load jobs - void set_load_error_hub_info(const TLoadErrorHubInfo& hub_info) { - TLoadErrorHubInfo* info = new TLoadErrorHubInfo(hub_info); - _load_error_hub_info.reset(info); - } - - // only can be invoded after set its value - const TLoadErrorHubInfo* load_error_hub_info() { - // DCHECK(_load_error_hub_info != nullptr); - return _load_error_hub_info.get(); - } - const int64_t get_normal_row_number() const { return _normal_row_number; } const void set_normal_row_number(int64_t number) { _normal_row_number = number; } @@ -271,8 +258,6 @@ public: _num_rows_load_unselected.fetch_add(num_rows); } - void export_load_error(const std::string& error_msg); - void set_per_fragment_instance_idx(int idx) { _per_fragment_instance_idx = idx; } int per_fragment_instance_idx() const { return _per_fragment_instance_idx; } @@ -495,15 +480,12 @@ private: std::string _db_name; std::string _load_dir; int64_t _load_job_id; - std::unique_ptr _load_error_hub_info; // mini load int64_t _normal_row_number; int64_t _error_row_number; std::string _error_log_file_path; std::ofstream* _error_log_file = nullptr; // error file path, absolute path - std::unique_ptr _error_hub; - std::mutex _create_error_hub_lock; std::vector _tablet_commit_infos; std::vector _error_tablet_infos; diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 33ca9a5e49..3d558f3993 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -64,10 +64,7 @@ set(UTIL_FILES mysql_row_buffer.cpp error_util.cc filesystem_util.cc - load_error_hub.cpp - broker_load_error_hub.cpp broker_storage_backend.cpp - null_load_error_hub.cpp time.cpp os_info.cpp os_util.cpp @@ -113,12 +110,6 @@ set(UTIL_FILES libjvm_loader.cpp ) -if (WITH_MYSQL) - set(UTIL_FILES ${UTIL_FILES} - mysql_load_error_hub.cpp - ) -endif() - if (OS_MACOSX) list(REMOVE_ITEM UTIL_FILES perf_counters.cpp disk_info.cpp) list(APPEND UTIL_FILES perf_counters_mac.cpp disk_info_mac.cpp) diff --git a/be/src/util/broker_load_error_hub.cpp b/be/src/util/broker_load_error_hub.cpp deleted file mode 100644 index a8c0441b27..0000000000 --- a/be/src/util/broker_load_error_hub.cpp +++ /dev/null @@ -1,96 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "util/broker_load_error_hub.h" - -#include "io/broker_writer.h" -#include "util/defer_op.h" - -namespace doris { - -BrokerLoadErrorHub::BrokerLoadErrorHub(ExecEnv* env, const TBrokerErrorHubInfo& info, - const std::string& error_log_file_name) - : _env(env), _info(info, error_log_file_name), _broker_writer(nullptr) {} - -BrokerLoadErrorHub::~BrokerLoadErrorHub() { - delete _broker_writer; - _broker_writer = nullptr; -} - -Status BrokerLoadErrorHub::prepare() { - _broker_writer = new BrokerWriter(_env, _info.addrs, _info.props, _info.path, 0); - - RETURN_IF_ERROR(_broker_writer->open()); - - _is_valid = true; - return Status::OK(); -} - -Status BrokerLoadErrorHub::export_error(const ErrorMsg& error_msg) { - std::lock_guard lock(_mtx); - ++_total_error_num; - - if (!_is_valid) { - return Status::OK(); - } - - _error_msgs.push(error_msg); - if (_error_msgs.size() >= EXPORTER_THRESHOLD) { - RETURN_IF_ERROR(write_to_broker()); - } - - return Status::OK(); -} - -Status BrokerLoadErrorHub::close() { - std::lock_guard lock(_mtx); - - if (!_is_valid) { - return Status::OK(); - } - - if (!_error_msgs.empty()) { - write_to_broker(); - } - - // close anyway - _broker_writer->close(); - - _is_valid = false; - return Status::OK(); -} - -Status BrokerLoadErrorHub::write_to_broker() { - std::stringstream ss; - while (!_error_msgs.empty()) { - ss << _error_msgs.front().job_id << ": " << _error_msgs.front().msg << "\n"; - _error_msgs.pop(); - } - - const std::string& msg = ss.str(); - size_t written_len = 0; - RETURN_IF_ERROR(_broker_writer->write((uint8_t*)msg.c_str(), msg.length(), &written_len)); - return Status::OK(); -} - -std::string BrokerLoadErrorHub::debug_string() const { - std::stringstream out; - out << "(total_error_num=" << _total_error_num << ")"; - return out.str(); -} - -} // end namespace doris diff --git a/be/src/util/broker_load_error_hub.h b/be/src/util/broker_load_error_hub.h deleted file mode 100644 index 0878a5a441..0000000000 --- a/be/src/util/broker_load_error_hub.h +++ /dev/null @@ -1,82 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include - -#include "gen_cpp/PaloInternalService_types.h" -#include "util/load_error_hub.h" - -namespace doris { - -class BrokerWriter; -class ExecEnv; - -// Broker load error hub will write load error info to the specified -// remote storage via broker. -// We should only open this error hub if there are error line. -// Because open the writer via broker may cost several seconds. -class BrokerLoadErrorHub : public LoadErrorHub { -public: - struct BrokerInfo { - std::vector addrs; - // path should be like: - // xxx://yyy/file_name - std::string path; - std::map props; - - BrokerInfo(const TBrokerErrorHubInfo& t_info, const std::string& error_log_file_name) - : props(t_info.prop) { - path = t_info.path + "/" + error_log_file_name; - addrs.push_back(t_info.broker_addr); - } - }; - - BrokerLoadErrorHub(ExecEnv* env, const TBrokerErrorHubInfo& info, - const std::string& error_log_file_name); - - virtual ~BrokerLoadErrorHub(); - - virtual Status prepare(); - - virtual Status export_error(const ErrorMsg& error_msg); - - virtual Status close(); - - virtual std::string debug_string() const; - -private: - Status write_to_broker(); - - ExecEnv* _env; - BrokerInfo _info; - - // the number in a write batch. - static const int32_t EXPORTER_THRESHOLD = 20; - - BrokerWriter* _broker_writer; - std::mutex _mtx; - std::queue _error_msgs; - int32_t _total_error_num = 0; - -}; // end class BrokerLoadErrorHub - -} // end namespace doris diff --git a/be/src/util/load_error_hub.cpp b/be/src/util/load_error_hub.cpp deleted file mode 100644 index 801401632b..0000000000 --- a/be/src/util/load_error_hub.cpp +++ /dev/null @@ -1,77 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "util/load_error_hub.h" - -#include - -#include "common/logging.h" -#include "gen_cpp/PaloInternalService_types.h" -#include "util/broker_load_error_hub.h" -#include "util/mysql_load_error_hub.h" -#include "util/null_load_error_hub.h" - -namespace doris { - -Status LoadErrorHub::create_hub(ExecEnv* env, const TLoadErrorHubInfo* t_hub_info, - const std::string& error_log_file_name, - std::unique_ptr* hub) { - LoadErrorHub* tmp_hub = nullptr; - - if (t_hub_info == nullptr) { - tmp_hub = new NullLoadErrorHub(); - tmp_hub->prepare(); - hub->reset(tmp_hub); - return Status::OK(); - } - - VLOG_ROW << "create_hub: " << apache::thrift::ThriftDebugString(*t_hub_info).c_str(); - - switch (t_hub_info->type) { - case TErrorHubType::MYSQL: -#ifdef DORIS_WITH_MYSQL - tmp_hub = new MysqlLoadErrorHub(t_hub_info->mysql_info); - tmp_hub->prepare(); - hub->reset(tmp_hub); - break; -#else - return Status::InternalError( - "Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON"); -#endif - case TErrorHubType::BROKER: { - // the origin file name may contains __shard_0/xxx - // replace the '/' with '_' - std::string copied_name(error_log_file_name); - std::replace(copied_name.begin(), copied_name.end(), '/', '_'); - tmp_hub = new BrokerLoadErrorHub(env, t_hub_info->broker_info, copied_name); - tmp_hub->prepare(); - hub->reset(tmp_hub); - break; - } - case TErrorHubType::NULL_TYPE: - tmp_hub = new NullLoadErrorHub(); - tmp_hub->prepare(); - hub->reset(tmp_hub); - break; - default: - return Status::InternalError("Unknown hub type {}", t_hub_info->type); - } - - return Status::OK(); -} - -} // end namespace doris diff --git a/be/src/util/load_error_hub.h b/be/src/util/load_error_hub.h deleted file mode 100644 index 256a6f9877..0000000000 --- a/be/src/util/load_error_hub.h +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include - -#include "common/status.h" - -namespace doris { - -class ExecEnv; -class TLoadErrorHubInfo; - -class LoadErrorHub { -public: - struct ErrorMsg { - int64_t job_id; - std::string msg; - // enum ErrType type; - - ErrorMsg(int64_t id, const std::string& message) : job_id(id), msg(message) {} - }; - - LoadErrorHub() {} - - virtual ~LoadErrorHub() {} - - static Status create_hub(ExecEnv* env, const TLoadErrorHubInfo* t_hub_info, - const std::string& error_log_file_name, - std::unique_ptr* hub); - - virtual Status prepare() = 0; - - virtual Status export_error(const ErrorMsg& error_msg) = 0; - - virtual Status close() = 0; - - virtual std::string debug_string() const = 0; - -protected: - // to show mysql url is valid - bool _is_valid = false; - - int32_t _total_error_num = 0; - -}; // end class LoadErrorHub - -} // end namespace doris diff --git a/be/src/util/mysql_load_error_hub.cpp b/be/src/util/mysql_load_error_hub.cpp deleted file mode 100644 index a9943016e5..0000000000 --- a/be/src/util/mysql_load_error_hub.cpp +++ /dev/null @@ -1,142 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include - -#define __DorisMysql MYSQL -#include "common/logging.h" -#include "mysql_load_error_hub.h" -#include "util/defer_op.h" - -namespace doris { - -MysqlLoadErrorHub::MysqlLoadErrorHub(const TMysqlErrorHubInfo& info) : _info(info) {} - -MysqlLoadErrorHub::~MysqlLoadErrorHub() {} - -Status MysqlLoadErrorHub::prepare() { - _is_valid = true; - return Status::OK(); -} - -Status MysqlLoadErrorHub::export_error(const ErrorMsg& error_msg) { - std::lock_guard lock(_mtx); - ++_total_error_num; - - if (!_is_valid) { - return Status::OK(); - } - - _error_msgs.push(error_msg); - if (_error_msgs.size() >= EXPORTER_THRESHOLD) { - RETURN_IF_ERROR(write_mysql()); - } - - return Status::OK(); -} - -Status MysqlLoadErrorHub::close() { - std::lock_guard lock(_mtx); - - if (!_is_valid) { - return Status::OK(); - } - - if (!_error_msgs.empty()) { - RETURN_IF_ERROR(write_mysql()); - } - - return Status::OK(); -} - -Status MysqlLoadErrorHub::write_mysql() { - MYSQL* my_conn = nullptr; - Status st = open_mysql_conn(&my_conn); - if (!st.ok()) { - _is_valid = false; - return st; - } - - Defer close_mysql_conn {[=]() { mysql_close(my_conn); }}; - - Status status; - std::stringstream sql_stream; - while (!_error_msgs.empty()) { - status = gen_sql(my_conn, _error_msgs.front(), &sql_stream); - if (!status.ok()) { - return error_status("fail to gen sql", my_conn); - } - _error_msgs.pop(); - } - - int sql_result = mysql_query(my_conn, sql_stream.str().c_str()); - if (sql_result != 0) { - LOG(WARNING) << "mysql insert failed. query=[" << sql_stream.str() << "]"; - return error_status("mysql query failed.", my_conn); - } - - VLOG_PROGRESS << "mysql query success. query =" << sql_stream.str(); - - return Status::OK(); -} - -Status MysqlLoadErrorHub::gen_sql(MYSQL* my_conn, const LoadErrorHub::ErrorMsg& error_msg, - std::stringstream* sql_stream) { - char* sql_start = &_escape_buff[0]; - size_t msg_size = error_msg.msg.size(); - if (msg_size > EXPORTER_MAX_LINE_SIZE) { - msg_size = EXPORTER_MAX_LINE_SIZE; - } - - (*sql_stream) << "insert into " << _info.table << " (job_id, error_msg) values(" - << error_msg.job_id << ", '" << sql_start << "'); "; - return Status::OK(); -} - -Status MysqlLoadErrorHub::open_mysql_conn(MYSQL** my_conn) { - *my_conn = mysql_init(nullptr); - if (nullptr == *my_conn) { - LOG(WARNING) << "load error export's mysql init failed."; - return Status::InternalError("mysql init failed."); - } - VLOG_ROW << "MysqlLoadErrorHub::init"; - - if (!mysql_real_connect(*my_conn, _info.host.c_str(), _info.user.c_str(), _info.passwd.c_str(), - _info.db.c_str(), _info.port, nullptr, CLIENT_MULTI_STATEMENTS)) { - LOG(WARNING) << "fail to connect Mysql: " - << "Host: " << _info.host << " port: " << _info.port << " user: " << _info.user - << " passwd: " << _info.passwd << " db: " << _info.db; - return error_status("load error mysql real connect failed.", *my_conn); - } - - return Status::OK(); -} - -Status MysqlLoadErrorHub::error_status(const std::string& prefix, MYSQL* my_conn) { - std::stringstream msg; - msg << prefix << " Err: " << mysql_error(my_conn); - LOG(WARNING) << msg.str(); - return Status::InternalError(msg.str()); -} - -std::string MysqlLoadErrorHub::debug_string() const { - std::stringstream out; - out << "(total_error_num=" << _total_error_num << ")"; - return out.str(); -} - -} // end namespace doris diff --git a/be/src/util/mysql_load_error_hub.h b/be/src/util/mysql_load_error_hub.h deleted file mode 100644 index 61a54ab05c..0000000000 --- a/be/src/util/mysql_load_error_hub.h +++ /dev/null @@ -1,102 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include -#include - -#include "gen_cpp/PaloInternalService_types.h" -#include "util/load_error_hub.h" - -#ifndef __DorisMysql -#define __DorisMysql void -#endif - -namespace doris { - -// For now every load job has its own mysql connection, -// and we use short connection to avoid to many connections. -// we write to mysql in a batch of data, not every data error msg, - -class MysqlLoadErrorHub : public LoadErrorHub { -public: - struct MysqlInfo { - std::string host; - int32_t port; - std::string user; - std::string passwd; - std::string db; - std::string table; - - MysqlInfo(const std::string& h, int32_t p, const std::string& u, const std::string& pwd, - const std::string& d, const std::string& t) - : host(h), port(p), user(u), passwd(pwd), db(d), table(t) {} - - MysqlInfo(const TMysqlErrorHubInfo& t_info) - : host(t_info.host), - port(t_info.port), - user(t_info.user), - passwd(t_info.passwd), - db(t_info.db), - table(t_info.table) {} - }; - - MysqlLoadErrorHub(const TMysqlErrorHubInfo& info); - - virtual ~MysqlLoadErrorHub(); - - virtual Status prepare(); - - virtual Status export_error(const ErrorMsg& error_msg); - - virtual Status close(); - - virtual std::string debug_string() const; - -private: - Status open_mysql_conn(__DorisMysql** my_conn); - - Status write_mysql(); - - Status gen_sql(__DorisMysql* my_conn, const LoadErrorHub::ErrorMsg& error_msg, - std::stringstream* sql_stream); - - Status error_status(const std::string& prefix, __DorisMysql* my_conn); - - MysqlInfo _info; - - // the number in a write batch. - static const int32_t EXPORTER_THRESHOLD = 100; - static const int32_t EXPORTER_MAX_ERROR_NUM = 50; - - // the max size of one line - static const int32_t EXPORTER_MAX_LINE_SIZE = 500; - - std::mutex _mtx; - std::queue _error_msgs; - int32_t _total_error_num = 0; - - // should at least (line_length * 2 + 1) long - std::array _escape_buff; - -}; // end class MysqlLoadErrorHub - -} // end namespace doris diff --git a/be/src/util/null_load_error_hub.cpp b/be/src/util/null_load_error_hub.cpp deleted file mode 100644 index f25a271c9c..0000000000 --- a/be/src/util/null_load_error_hub.cpp +++ /dev/null @@ -1,48 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "null_load_error_hub.h" - -namespace doris { - -NullLoadErrorHub::NullLoadErrorHub() {} - -NullLoadErrorHub::~NullLoadErrorHub() {} - -Status NullLoadErrorHub::prepare() { - _is_valid = true; - return Status::OK(); -} - -Status NullLoadErrorHub::export_error(const ErrorMsg& error_msg) { - std::lock_guard lock(_mtx); - ++_total_error_num; - - return Status::OK(); -} - -Status NullLoadErrorHub::close() { - return Status::OK(); -} - -std::string NullLoadErrorHub::debug_string() const { - std::stringstream out; - out << "NullLoadErrorHub(total_error_num=" << _total_error_num << ")"; - return out.str(); -} - -} // end namespace doris diff --git a/be/src/util/null_load_error_hub.h b/be/src/util/null_load_error_hub.h deleted file mode 100644 index 734e9b9f61..0000000000 --- a/be/src/util/null_load_error_hub.h +++ /dev/null @@ -1,52 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#pragma once - -#include -#include -#include -#include - -#include "load_error_hub.h" - -namespace doris { - -// do not export error. -// only record some metric to some memory(like total error row) for now. - -class NullLoadErrorHub : public LoadErrorHub { -public: - NullLoadErrorHub(); - - virtual ~NullLoadErrorHub(); - - virtual Status prepare(); - - virtual Status export_error(const ErrorMsg& error_msg); - - virtual Status close(); - - virtual std::string debug_string() const; - -private: - std::mutex _mtx; - std::queue _error_msgs; - -}; // end class NullLoadErrorHub - -} // end namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java index 0211ff59ae..2c9da96a39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SystemHandler.java @@ -21,7 +21,6 @@ import org.apache.doris.analysis.AddBackendClause; import org.apache.doris.analysis.AddFollowerClause; import org.apache.doris.analysis.AddObserverClause; import org.apache.doris.analysis.AlterClause; -import org.apache.doris.analysis.AlterLoadErrorUrlClause; import org.apache.doris.analysis.CancelAlterSystemStmt; import org.apache.doris.analysis.CancelStmt; import org.apache.doris.analysis.DecommissionBackendClause; @@ -167,9 +166,6 @@ public class SystemHandler extends AlterHandler { } else if (alterClause instanceof ModifyBrokerClause) { ModifyBrokerClause clause = (ModifyBrokerClause) alterClause; Env.getCurrentEnv().getBrokerMgr().execute(clause); - } else if (alterClause instanceof AlterLoadErrorUrlClause) { - AlterLoadErrorUrlClause clause = (AlterLoadErrorUrlClause) alterClause; - Env.getCurrentEnv().getLoadInstance().setLoadErrorHubInfo(clause.getProperties()); } else if (alterClause instanceof ModifyBackendClause) { Env.getCurrentSystemInfo().modifyBackends(((ModifyBackendClause) alterClause)); } else { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterLoadErrorUrlClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterLoadErrorUrlClause.java index e2b7ef7364..01c6fab9cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterLoadErrorUrlClause.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterLoadErrorUrlClause.java @@ -20,9 +20,7 @@ package org.apache.doris.analysis; import org.apache.doris.alter.AlterOpType; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.util.PrintableMap; -import org.apache.doris.load.LoadErrorHub; -import com.google.common.base.Strings; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -35,7 +33,6 @@ public class AlterLoadErrorUrlClause extends AlterClause { private static final Logger LOG = LogManager.getLogger(AlterLoadErrorUrlClause.class); private Map properties; - private LoadErrorHub.Param param; public AlterLoadErrorUrlClause(Map properties) { super(AlterOpType.ALTER_OTHER); @@ -49,18 +46,7 @@ public class AlterLoadErrorUrlClause extends AlterClause { @Override public void analyze(Analyzer analyzer) throws AnalysisException { - if (properties == null || properties.isEmpty()) { - throw new AnalysisException("Load errors hub's properties are missing"); - } - - String type = properties.get("type"); - if (Strings.isNullOrEmpty(type)) { - throw new AnalysisException("Load errors hub's type is missing"); - } - - if (!type.equalsIgnoreCase("MYSQL") && !type.equalsIgnoreCase("BROKER")) { - throw new AnalysisException("Load errors hub's type should be MYSQL or BROKER"); - } + throw new AnalysisException("Load errors hub is not supported"); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 94b809d00d..0a238e9dbd 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -149,7 +149,6 @@ import org.apache.doris.load.ExportChecker; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.Load; -import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.StreamLoadRecordMgr; import org.apache.doris.load.loadv2.LoadEtlChecker; import org.apache.doris.load.loadv2.LoadJobScheduler; @@ -1712,62 +1711,6 @@ public class Env { return getInternalCatalog().loadDb(dis, checksum); } - public long saveLoadJob(CountingDataOutputStream dos, long checksum) throws IOException { - // 1. save load.dbToLoadJob, force set to 0 since there should be none load jobs - int jobSize = 0; - checksum ^= jobSize; - dos.writeInt(jobSize); - - // 2. save delete jobs - // delete jobs are moved to DeleteHandler. So here we just set job size as 0. - jobSize = 0; - checksum ^= jobSize; - dos.writeInt(jobSize); - - // 3. load error hub info - LoadErrorHub.Param param = load.getLoadErrorHubInfo(); - param.write(dos); - - // 4. save delete load job info - // delete jobs are moved to DeleteHandler. So here we just set job size as 0. - int deleteJobSize = 0; - checksum ^= deleteJobSize; - dos.writeInt(deleteJobSize); - return checksum; - } - - public long loadLoadJob(DataInputStream dis, long checksum) throws IOException, DdlException { - // load jobs - int jobSize = dis.readInt(); - long newChecksum = checksum ^ jobSize; - - if (jobSize > 0) { - LOG.warn("there should be no load jobs, please rollback to 1.2.x and check if there are hadoop load jobs"); - throw new RuntimeException("there should be no load jobs, please rollback to 1.2.x " - + "and check if there are hadoop load jobs"); - } - - // delete jobs - // Delete job has been moved to DeleteHandler. Here the jobSize is always 0, we need do nothing. - jobSize = dis.readInt(); - Preconditions.checkState(jobSize == 0, jobSize); - newChecksum ^= jobSize; - - // load error hub info - LoadErrorHub.Param param = new LoadErrorHub.Param(); - param.readFields(dis); - load.setLoadErrorHubInfo(param); - - // 4. load delete jobs - // Delete job has been moved to DeleteHandler. Here the jobSize is always 0, we need do nothing. - int deleteJobSize = dis.readInt(); - Preconditions.checkState(deleteJobSize == 0, deleteJobSize); - newChecksum ^= deleteJobSize; - - LOG.info("finished replay loadJob from image"); - return newChecksum; - } - public long loadExportJob(DataInputStream dis, long checksum) throws IOException, DdlException { long curTime = System.currentTimeMillis(); long newChecksum = checksum; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadErrorHubProcNode.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadErrorHubProcNode.java deleted file mode 100644 index 16aa240183..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/LoadErrorHubProcNode.java +++ /dev/null @@ -1,47 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.common.proc; - -import org.apache.doris.catalog.Env; -import org.apache.doris.load.LoadErrorHub; - -import com.google.common.collect.ImmutableList; - -public class LoadErrorHubProcNode implements ProcNodeInterface { - public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() - .add("Type").add("Properties") - .build(); - - private Env env; - - public LoadErrorHubProcNode(Env env) { - this.env = env; - } - - @Override - public ProcResult fetchResult() { - BaseProcResult result = new BaseProcResult(); - result.setNames(TITLE_NAMES); - LoadErrorHub.Param param = env.getLoadInstance().getLoadErrorHubInfo(); - if (param != null) { - result.addRow(param.getInfo()); - } - - return result; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java index c62cdcf92c..e543c5b435 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java @@ -45,7 +45,6 @@ public final class ProcService { root.register("frontends", new FrontendsProcNode(Env.getCurrentEnv())); root.register("brokers", Env.getCurrentEnv().getBrokerMgr().getProcNode()); root.register("resources", Env.getCurrentEnv().getResourceMgr().getProcNode()); - root.register("load_error_hub", new LoadErrorHubProcNode(Env.getCurrentEnv())); root.register("transactions", new TransDbProcDir()); root.register("trash", new TrashProcDir()); root.register("monitor", new MonitorProcDir()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerLoadErrorHub.java b/fe/fe-core/src/main/java/org/apache/doris/load/BrokerLoadErrorHub.java deleted file mode 100644 index b2101d530b..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/load/BrokerLoadErrorHub.java +++ /dev/null @@ -1,127 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.load; - -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.FsBroker; -import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.PrintableMap; -import org.apache.doris.thrift.TBrokerErrorHubInfo; -import org.apache.doris.thrift.TNetworkAddress; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -public class BrokerLoadErrorHub extends LoadErrorHub { - - private BrokerParam brokerParam; - - public BrokerLoadErrorHub(BrokerParam brokerParam) { - this.brokerParam = brokerParam; - } - - public BrokerParam getBrokerParam() { - return brokerParam; - } - - public static class BrokerParam implements Writable { - private String brokerName; - private String path; - private Map prop = Maps.newHashMap(); - - // for persist - public BrokerParam() { - } - - public BrokerParam(String brokerName, String path, Map prop) { - this.brokerName = brokerName; - this.path = path; - this.prop = prop; - } - - @Override - public void write(DataOutput out) throws IOException { - Text.writeString(out, brokerName); - Text.writeString(out, path); - out.writeInt(prop.size()); - for (Map.Entry entry : prop.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); - } - } - - public void readFields(DataInput in) throws IOException { - brokerName = Text.readString(in); - path = Text.readString(in); - int size = in.readInt(); - for (int i = 0; i < size; i++) { - String key = Text.readString(in); - String val = Text.readString(in); - prop.put(key, val); - } - } - - public TBrokerErrorHubInfo toThrift() { - FsBroker fsBroker = Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerName); - if (fsBroker == null) { - return null; - } - TBrokerErrorHubInfo info = new TBrokerErrorHubInfo(new TNetworkAddress(fsBroker.ip, fsBroker.port), - path, prop); - return info; - } - - public String getBrief() { - Map briefMap = Maps.newHashMap(prop); - briefMap.put("name", brokerName); - briefMap.put("path", path); - PrintableMap printableMap = new PrintableMap<>(briefMap, "=", true, false, true); - return printableMap.toString(); - } - } - - // Broker load error hub does not support showing detail error msg in 'show load warnings' stmt. - // User need to file error file in remote storage with file name showed in 'show load' stmt - @Override - public List fetchLoadError(long jobId) { - List result = Lists.newArrayList(); - final String hint = "Find detail load error info on '" - + brokerParam.path + "' with file name showed in 'SHOW LOAD' stmt"; - ErrorMsg errorMsg = new ErrorMsg(0, hint); - result.add(errorMsg); - return result; - } - - @Override - public boolean prepare() { - return true; - } - - @Override - public boolean close() { - return true; - } - -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 9cb4cedd00..2a6b6dbc30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -33,11 +33,8 @@ import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.PartitionNames; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.StorageBackend; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.backup.BlobStorage; -import org.apache.doris.backup.Status; import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -129,8 +126,6 @@ public class Load { // dbId -> set of (label, timestamp) private Map> dbToMiniLabels; // db to mini uncommitted label - private volatile LoadErrorHub.Param loadErrorHubParam = new LoadErrorHub.Param(); - // lock for load job // lock is private and must use after db lock private ReentrantReadWriteLock lock; @@ -1596,83 +1591,6 @@ public class Load { return infos; } - public LoadErrorHub.Param getLoadErrorHubInfo() { - return loadErrorHubParam; - } - - public void setLoadErrorHubInfo(LoadErrorHub.Param info) { - this.loadErrorHubParam = info; - } - - public void setLoadErrorHubInfo(Map properties) throws DdlException { - String type = properties.get("type"); - if (type.equalsIgnoreCase("MYSQL")) { - String host = properties.get("host"); - if (Strings.isNullOrEmpty(host)) { - throw new DdlException("mysql host is missing"); - } - - int port = -1; - try { - port = Integer.valueOf(properties.get("port")); - } catch (NumberFormatException e) { - throw new DdlException("invalid mysql port: " + properties.get("port")); - } - - String user = properties.get("user"); - if (Strings.isNullOrEmpty(user)) { - throw new DdlException("mysql user name is missing"); - } - - String db = properties.get("database"); - if (Strings.isNullOrEmpty(db)) { - throw new DdlException("mysql database is missing"); - } - - String tbl = properties.get("table"); - if (Strings.isNullOrEmpty(tbl)) { - throw new DdlException("mysql table is missing"); - } - - String pwd = Strings.nullToEmpty(properties.get("password")); - - MysqlLoadErrorHub.MysqlParam param = new MysqlLoadErrorHub.MysqlParam(host, port, user, pwd, db, tbl); - loadErrorHubParam = LoadErrorHub.Param.createMysqlParam(param); - } else if (type.equalsIgnoreCase("BROKER")) { - String brokerName = properties.get("name"); - if (Strings.isNullOrEmpty(brokerName)) { - throw new DdlException("broker name is missing"); - } - properties.remove("name"); - - if (!Env.getCurrentEnv().getBrokerMgr().containsBroker(brokerName)) { - throw new DdlException("broker does not exist: " + brokerName); - } - - String path = properties.get("path"); - if (Strings.isNullOrEmpty(path)) { - throw new DdlException("broker path is missing"); - } - properties.remove("path"); - - // check if broker info is invalid - BlobStorage blobStorage = BlobStorage.create(brokerName, StorageBackend.StorageType.BROKER, properties); - Status st = blobStorage.checkPathExist(path); - if (!st.ok()) { - throw new DdlException("failed to visit path: " + path + ", err: " + st.getErrMsg()); - } - - BrokerLoadErrorHub.BrokerParam param = new BrokerLoadErrorHub.BrokerParam(brokerName, path, properties); - loadErrorHubParam = LoadErrorHub.Param.createBrokerParam(param); - } else if (type.equalsIgnoreCase("null")) { - loadErrorHubParam = LoadErrorHub.Param.createNullParam(); - } - - Env.getCurrentEnv().getEditLog().logSetLoadErrorHub(loadErrorHubParam); - - LOG.info("set load error hub info: {}", loadErrorHubParam); - } - public static class JobInfo { public String dbName; public Set tblNames = Sets.newHashSet(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadErrorHub.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadErrorHub.java index 03f1c4a298..e40f0689a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadErrorHub.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadErrorHub.java @@ -19,25 +19,113 @@ package org.apache.doris.load; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; -import org.apache.doris.thrift.TErrorHubType; -import org.apache.doris.thrift.TLoadErrorHubInfo; +import org.apache.doris.common.util.PrintableMap; -import com.google.common.base.MoreObjects; -import com.google.common.base.MoreObjects.ToStringHelper; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.util.List; import java.util.Map; public abstract class LoadErrorHub { - private static final Logger LOG = LogManager.getLogger(LoadErrorHub.class); + + public static class MysqlParam implements Writable { + private String host; + private int port; + private String user; + private String passwd; + private String db; + private String table; + + public MysqlParam() { + host = ""; + port = 0; + user = ""; + passwd = ""; + db = ""; + table = ""; + } + + public MysqlParam(String host, int port, String user, String passwd, String db, String table) { + this.host = host; + this.port = port; + this.user = user; + this.passwd = passwd; + this.db = db; + this.table = table; + } + + public String getBrief() { + Map briefMap = Maps.newHashMap(); + briefMap.put("host", host); + briefMap.put("port", String.valueOf(port)); + briefMap.put("user", user); + briefMap.put("password", passwd); + briefMap.put("database", db); + briefMap.put("table", table); + PrintableMap printableMap = new PrintableMap<>(briefMap, "=", true, false, true); + return printableMap.toString(); + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, host); + out.writeInt(port); + Text.writeString(out, user); + Text.writeString(out, passwd); + Text.writeString(out, db); + Text.writeString(out, table); + } + + public void readFields(DataInput in) throws IOException { + host = Text.readString(in); + port = in.readInt(); + user = Text.readString(in); + passwd = Text.readString(in); + db = Text.readString(in); + table = Text.readString(in); + } + } + + public static class BrokerParam implements Writable { + private String brokerName; + private String path; + private Map prop = Maps.newHashMap(); + + // for persist + public BrokerParam() { + } + + public BrokerParam(String brokerName, String path, Map prop) { + this.brokerName = brokerName; + this.path = path; + this.prop = prop; + } + + @Override + public void write(DataOutput out) throws IOException { + Text.writeString(out, brokerName); + Text.writeString(out, path); + out.writeInt(prop.size()); + for (Map.Entry entry : prop.entrySet()) { + Text.writeString(out, entry.getKey()); + Text.writeString(out, entry.getValue()); + } + } + + public void readFields(DataInput in) throws IOException { + brokerName = Text.readString(in); + path = Text.readString(in); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + String key = Text.readString(in); + String val = Text.readString(in); + prop.put(key, val); + } + } + } public static final String MYSQL_PROTOCOL = "MYSQL"; public static final String BROKER_PROTOCOL = "BROKER"; @@ -48,137 +136,16 @@ public abstract class LoadErrorHub { NULL_TYPE } - public class ErrorMsg { - private long jobId; - private String msg; - - public ErrorMsg(long id, String message) { - jobId = id; - msg = message; - } - - public long getJobId() { - return jobId; - } - - public String getMsg() { - return msg; - } - } - public static class Param implements Writable { private HubType type; - private MysqlLoadErrorHub.MysqlParam mysqlParam; - private BrokerLoadErrorHub.BrokerParam brokerParam; + private MysqlParam mysqlParam; + private BrokerParam brokerParam; // for replay public Param() { type = HubType.NULL_TYPE; } - public static Param createMysqlParam(MysqlLoadErrorHub.MysqlParam mysqlParam) { - Param param = new Param(); - param.type = HubType.MYSQL_TYPE; - param.mysqlParam = mysqlParam; - return param; - } - - public static Param createBrokerParam(BrokerLoadErrorHub.BrokerParam brokerParam) { - Param param = new Param(); - param.type = HubType.BROKER_TYPE; - param.brokerParam = brokerParam; - return param; - } - - public static Param createNullParam() { - Param param = new Param(); - param.type = HubType.NULL_TYPE; - return param; - } - - public HubType getType() { - return type; - } - - public MysqlLoadErrorHub.MysqlParam getMysqlParam() { - return mysqlParam; - } - - public BrokerLoadErrorHub.BrokerParam getBrokerParam() { - return brokerParam; - } - - public String toString() { - ToStringHelper helper = MoreObjects.toStringHelper(this); - helper.add("type", type.toString()); - switch (type) { - case MYSQL_TYPE: - helper.add("mysql_info", mysqlParam.toString()); - break; - case NULL_TYPE: - helper.add("mysql_info", "null"); - break; - default: - Preconditions.checkState(false, "unknown hub type"); - } - - return helper.toString(); - } - - public TLoadErrorHubInfo toThrift() { - TLoadErrorHubInfo info = new TLoadErrorHubInfo(); - switch (type) { - case MYSQL_TYPE: - info.setType(TErrorHubType.MYSQL); - info.setMysqlInfo(mysqlParam.toThrift()); - break; - case BROKER_TYPE: - info.setType(TErrorHubType.BROKER); - info.setBrokerInfo(brokerParam.toThrift()); - break; - case NULL_TYPE: - info.setType(TErrorHubType.NULL_TYPE); - break; - default: - Preconditions.checkState(false, "unknown hub type"); - } - return info; - } - - public Map toDppConfigInfo() { - Map dppHubInfo = Maps.newHashMap(); - dppHubInfo.put("type", type.toString()); - switch (type) { - case MYSQL_TYPE: - dppHubInfo.put("info", mysqlParam); - break; - case BROKER_TYPE: - Preconditions.checkState(false, "hadoop load do not support broker error hub"); - break; - case NULL_TYPE: - break; - default: - Preconditions.checkState(false, "unknown hub type"); - } - return dppHubInfo; - } - - public List getInfo() { - List info = Lists.newArrayList(); - info.add(type.name()); - switch (type) { - case MYSQL_TYPE: - info.add(mysqlParam.getBrief()); - break; - case BROKER_TYPE: - info.add(brokerParam.getBrief()); - break; - default: - info.add(""); - } - return info; - } - @Override public void write(DataOutput out) throws IOException { Text.writeString(out, type.name()); @@ -200,11 +167,11 @@ public abstract class LoadErrorHub { type = HubType.valueOf(Text.readString(in)); switch (type) { case MYSQL_TYPE: - mysqlParam = new MysqlLoadErrorHub.MysqlParam(); + mysqlParam = new MysqlParam(); mysqlParam.readFields(in); break; case BROKER_TYPE: - brokerParam = new BrokerLoadErrorHub.BrokerParam(); + brokerParam = new BrokerParam(); brokerParam.readFields(in); break; case NULL_TYPE: @@ -214,29 +181,4 @@ public abstract class LoadErrorHub { } } } - - public abstract List fetchLoadError(long jobId); - - public abstract boolean prepare(); - - public abstract boolean close(); - - public static LoadErrorHub createHub(Param param) { - switch (param.getType()) { - case MYSQL_TYPE: { - LoadErrorHub hub = new MysqlLoadErrorHub(param.getMysqlParam()); - hub.prepare(); - return hub; - } - case BROKER_TYPE: { - LoadErrorHub hub = new BrokerLoadErrorHub(param.getBrokerParam()); - hub.prepare(); - return hub; - } - default: - Preconditions.checkState(false, "unknown hub type"); - } - - return null; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/MysqlLoadErrorHub.java b/fe/fe-core/src/main/java/org/apache/doris/load/MysqlLoadErrorHub.java deleted file mode 100644 index 9ba77ebb02..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/load/MysqlLoadErrorHub.java +++ /dev/null @@ -1,210 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.load; - -import org.apache.doris.common.io.Text; -import org.apache.doris.common.io.Writable; -import org.apache.doris.common.util.MysqlUtil; -import org.apache.doris.common.util.PrintableMap; -import org.apache.doris.thrift.TMysqlErrorHubInfo; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.List; -import java.util.Map; - -public class MysqlLoadErrorHub extends LoadErrorHub { - private static final Logger LOG = LogManager.getLogger(MysqlLoadErrorHub.class); - - private static final String QUERY_SQL_FIRST = "SELECT job_id, error_msg FROM "; - private static final String QUERY_SQL_LAST = " WHERE job_id = ? LIMIT ? "; - private static final long MAX_LINE = 10; - private static final int STMT_TIMEOUT_S = 5; - - private MysqlParam param; - - public static class MysqlParam implements Writable { - private String host; - private int port; - private String user; - private String passwd; - private String db; - private String table; - - public MysqlParam() { - host = ""; - port = 0; - user = ""; - passwd = ""; - db = ""; - table = ""; - } - - public MysqlParam(String host, int port, String user, String passwd, String db, String table) { - this.host = host; - this.port = port; - this.user = user; - this.passwd = passwd; - this.db = db; - this.table = table; - } - - public String getBrief() { - Map briefMap = Maps.newHashMap(); - briefMap.put("host", host); - briefMap.put("port", String.valueOf(port)); - briefMap.put("user", user); - briefMap.put("password", passwd); - briefMap.put("database", db); - briefMap.put("table", table); - PrintableMap printableMap = new PrintableMap<>(briefMap, "=", true, false, true); - return printableMap.toString(); - } - - public String getHost() { - return host; - } - - public int getPort() { - return port; - } - - public String getUser() { - return user; - } - - public String getPasswd() { - return passwd; - } - - public String getDb() { - return db; - } - - public String getTable() { - return table; - } - - @Override - public void write(DataOutput out) throws IOException { - Text.writeString(out, host); - out.writeInt(port); - Text.writeString(out, user); - Text.writeString(out, passwd); - Text.writeString(out, db); - Text.writeString(out, table); - } - - public void readFields(DataInput in) throws IOException { - host = Text.readString(in); - port = in.readInt(); - user = Text.readString(in); - passwd = Text.readString(in); - db = Text.readString(in); - table = Text.readString(in); - } - - public TMysqlErrorHubInfo toThrift() { - TMysqlErrorHubInfo info = new TMysqlErrorHubInfo(host, port, user, passwd, db, table); - return info; - } - } - - public MysqlLoadErrorHub(MysqlParam mysqlParam) { - Preconditions.checkNotNull(mysqlParam); - param = mysqlParam; - } - - @Override - public List fetchLoadError(long jobId) { - List result = Lists.newArrayList(); - - Connection conn = null; - PreparedStatement stmt = null; - ResultSet resultSet = null; - - conn = MysqlUtil.getConnection( - param.getHost(), param.getPort(), param.getDb(), - param.getUser(), param.getPasswd()); - if (conn == null) { - return result; - } - - String sql = null; - try { - sql = QUERY_SQL_FIRST + param.getTable() + QUERY_SQL_LAST; - stmt = conn.prepareStatement(sql); - stmt.setLong(1, jobId); - stmt.setLong(2, MAX_LINE); - stmt.setQueryTimeout(STMT_TIMEOUT_S); - resultSet = stmt.executeQuery(); - while (resultSet.next()) { - String msg = resultSet.getString("error_msg"); - result.add(new ErrorMsg(jobId, msg)); - } - } catch (SQLException e) { - LOG.warn("fail to query load error mysql. " - + "sql={}, table={}, jobId={}, max_line={}, exception={}", - sql, param.getTable(), jobId, MAX_LINE, e); - } finally { - if (resultSet != null) { - try { - resultSet.close(); - } catch (SQLException sqlEx) { - LOG.warn("fail to close resultSet of load error."); - } - resultSet = null; - } - - if (stmt != null) { - try { - stmt.close(); - } catch (SQLException sqlEx) { - LOG.warn("fail to close stmt."); - } - stmt = null; - } - - MysqlUtil.closeConnection(conn); - conn = null; - } - - return result; - } - - @Override - public boolean prepare() { - return true; - } - - @Override - public boolean close() { - return true; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java index 4c359c72d7..03ef40086d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java @@ -528,8 +528,8 @@ public class EditLog { break; } case OperationType.OP_SET_LOAD_ERROR_HUB: { - final LoadErrorHub.Param param = (LoadErrorHub.Param) journal.getData(); - env.getLoadInstance().setLoadErrorHubInfo(param); + // final LoadErrorHub.Param param = (LoadErrorHub.Param) journal.getData(); + // ignore load error hub break; } case OperationType.OP_UPDATE_CLUSTER_AND_BACKENDS: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java index 896aaae698..0198a6b20f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaPersistMethod.java @@ -71,12 +71,6 @@ public class MetaPersistMethod { metaPersistMethod.writeMethod = Env.class.getDeclaredMethod("saveDb", CountingDataOutputStream.class, long.class); break; - case "loadJob": - metaPersistMethod.readMethod = - Env.class.getDeclaredMethod("loadLoadJob", DataInputStream.class, long.class); - metaPersistMethod.writeMethod = - Env.class.getDeclaredMethod("saveLoadJob", CountingDataOutputStream.class, long.class); - break; case "alterJob": metaPersistMethod.readMethod = Env.class.getDeclaredMethod("loadAlterJob", DataInputStream.class, long.class); diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java index f502bcdc5d..37a89ad001 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/MetaReader.java @@ -81,11 +81,21 @@ public class MetaReader { checksum = env.loadHeader(dis, metaHeader, checksum); // 3. Read other meta modules // Modules must be read in the order in which the metadata was written - for (MetaIndex metaIndex : metaFooter.metaIndices) { + for (int i = 0; i < metaFooter.metaIndices.size(); ++i) { + MetaIndex metaIndex = metaFooter.metaIndices.get(i); if (metaIndex.name.equals("header")) { // skip meta header, which has been read before. continue; } + // Should skip some bytes because ignore some meta, such as load job + if (metaIndex.name.equals("loadJob")) { + LOG.info("This is {}, skip {} bytes", metaIndex.name, + metaFooter.metaIndices.get(i + 1).offset - metaIndex.offset); + if (i < metaFooter.metaIndices.size() - 1) { + IOUtils.skipFully(dis, metaFooter.metaIndices.get(i + 1).offset - metaIndex.offset); + } + continue; + } MetaPersistMethod persistMethod = PersistMetaModules.MODULES_MAP.get(metaIndex.name); if (persistMethod == null) { throw new IOException("Unknown meta module: " + metaIndex.name + ". Known modules: " diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java index e4db00bc8e..2074ffe539 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/meta/PersistMetaModules.java @@ -35,7 +35,7 @@ public class PersistMetaModules { public static final List MODULES_IN_ORDER; public static final ImmutableList MODULE_NAMES = ImmutableList.of( - "masterInfo", "frontends", "backends", "datasource", "db", "loadJob", "alterJob", "recycleBin", + "masterInfo", "frontends", "backends", "datasource", "db", "alterJob", "recycleBin", "globalVariable", "cluster", "broker", "resources", "exportJob", "syncJob", "backupHandler", "paloAuth", "transactionState", "colocateTableIndex", "routineLoadJobs", "loadJobV2", "smallFiles", "plugins", "deleteHandler", "sqlBlockRule", "policy", "mtmvJobManager", "cooldownJob"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 1128342804..b8a0074d26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -43,7 +43,6 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; import org.apache.doris.load.BrokerFileGroup; -import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.planner.external.ExternalFileScanNode; @@ -53,7 +52,6 @@ import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TFileType; -import org.apache.doris.thrift.TLoadErrorHubInfo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPlanFragmentExecParams; import org.apache.doris.thrift.TQueryGlobals; @@ -266,15 +264,6 @@ public class StreamLoadPlanner { params.setQueryGlobals(queryGlobals); - // set load error hub if exist - LoadErrorHub.Param param = Env.getCurrentEnv().getLoadInstance().getLoadErrorHubInfo(); - if (param != null) { - TLoadErrorHubInfo info = param.toThrift(); - if (info != null) { - params.setLoadErrorHubInfo(info); - } - } - // LOG.debug("stream load txn id: {}, plan: {}", streamLoadTask.getTxnId(), params); return params; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 36ae55b273..d284ce999a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -37,7 +37,6 @@ import org.apache.doris.common.util.ProfileWriter; import org.apache.doris.common.util.RuntimeProfile; import org.apache.doris.common.util.TimeUtils; import org.apache.doris.common.util.VectorizedUtil; -import org.apache.doris.load.LoadErrorHub; import org.apache.doris.load.loadv2.LoadJob; import org.apache.doris.metric.MetricRepo; import org.apache.doris.planner.DataPartition; @@ -78,7 +77,6 @@ import org.apache.doris.thrift.TErrorTabletInfo; import org.apache.doris.thrift.TEsScanRange; import org.apache.doris.thrift.TExecPlanFragmentParams; import org.apache.doris.thrift.TExecPlanFragmentParamsList; -import org.apache.doris.thrift.TLoadErrorHubInfo; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloScanRange; import org.apache.doris.thrift.TPlanFragmentDestination; @@ -2482,15 +2480,6 @@ public class Coordinator { rf.getFilterId().asInt(), rf.toThrift()); } } - if (queryOptions.getQueryType() == TQueryType.LOAD) { - LoadErrorHub.Param param = Env.getCurrentEnv().getLoadInstance().getLoadErrorHubInfo(); - if (param != null) { - TLoadErrorHubInfo info = param.toThrift(); - if (info != null) { - params.setLoadErrorHubInfo(info); - } - } - } paramsList.add(params); } return paramsList; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index a3230949b4..ea4b384fb2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -172,8 +172,6 @@ import org.apache.doris.load.DeleteHandler; import org.apache.doris.load.ExportJob; import org.apache.doris.load.ExportMgr; import org.apache.doris.load.Load; -import org.apache.doris.load.LoadErrorHub; -import org.apache.doris.load.LoadErrorHub.HubType; import org.apache.doris.load.LoadJob; import org.apache.doris.load.LoadJob.JobState; import org.apache.doris.load.routineload.RoutineLoadJob; @@ -1240,24 +1238,7 @@ public class ShowExecutor { } } } - - LoadErrorHub.Param param = load.getLoadErrorHubInfo(); - if (param == null || param.getType() == HubType.NULL_TYPE) { - throw new AnalysisException("no load error hub be supplied."); - } - LoadErrorHub errorHub = LoadErrorHub.createHub(param); - List errors = errorHub.fetchLoadError(jobId); - errorHub.close(); - List> rows = Lists.newArrayList(); - for (LoadErrorHub.ErrorMsg error : errors) { - List oneInfo = Lists.newArrayList(); - oneInfo.add(String.valueOf(jobId)); - oneInfo.add(label); - oneInfo.add(error.getMsg()); - rows.add(oneInfo); - } - long limit = showWarningsStmt.getLimitNum(); if (limit != -1L && limit < rows.size()) { rows = rows.subList(0, (int) limit);