From 8be71b69d54a3f8d4fc0fbc75195f301cf2e9308 Mon Sep 17 00:00:00 2001 From: yiguolei Date: Thu, 3 Mar 2022 09:30:54 +0800 Subject: [PATCH] [refactor] remove pusher.cpp and related mock test code (#8288) --- be/src/agent/pusher.cpp | 215 ---------------------- be/src/olap/task/engine_batch_load_task.h | 3 +- be/test/agent/mock_pusher.h | 32 ---- 3 files changed, 1 insertion(+), 249 deletions(-) delete mode 100644 be/src/agent/pusher.cpp delete mode 100644 be/test/agent/mock_pusher.h diff --git a/be/src/agent/pusher.cpp b/be/src/agent/pusher.cpp deleted file mode 100644 index f122d67ce6..0000000000 --- a/be/src/agent/pusher.cpp +++ /dev/null @@ -1,215 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "agent/pusher.h" - -#include - -#include -#include -#include -#include -#include -#include - -#include "agent/cgroups_mgr.h" -#include "boost/lexical_cast.hpp" -#include "gen_cpp/AgentService_types.h" -#include "http/http_client.h" -#include "olap/olap_common.h" -#include "olap/olap_define.h" -#include "olap/olap_engine.h" -#include "olap/olap_table.h" -#include "util/stopwatch.hpp" - -using std::list; -using std::string; -using std::vector; - -namespace doris { - -Pusher::Pusher(OLAPEngine* engine, const TPushReq& push_req) - : _push_req(push_req), _engine(engine) {} - -Pusher::~Pusher() {} - -AgentStatus Pusher::init() { - AgentStatus status = DORIS_SUCCESS; - - // Check replica exist - OLAPTablePtr olap_table; - olap_table = _engine->get_table(_push_req.tablet_id, _push_req.schema_hash); - if (olap_table.get() == nullptr) { - OLAP_LOG_WARNING("get tables failed. tablet_id: %ld, schema_hash: %ld", _push_req.tablet_id, - _push_req.schema_hash); - return DORIS_PUSH_INVALID_TABLE; - } - - // Empty remote_path - if (!_push_req.__isset.http_file_path) { - return status; - } - - // Check remote path - _remote_file_path = _push_req.http_file_path; - - // Get local download path - LOG(INFO) << "start get file. remote_file_path: " << _remote_file_path; - - // Set download param - string tmp_file_dir; - status = _get_tmp_file_dir(olap_table->storage_root_path_name(), &tmp_file_dir); - if (status != DORIS_SUCCESS) { - LOG(WARNING) << "get local path failed. tmp file dir: " << tmp_file_dir; - return status; - } - string tmp_file_name; - _get_file_name_from_path(_push_req.http_file_path, &tmp_file_name); - _local_file_path = tmp_file_dir + "/" + tmp_file_name; - - return status; -} - -// Get replica root path -AgentStatus Pusher::_get_tmp_file_dir(const string& root_path, string* download_path) { - AgentStatus status = DORIS_SUCCESS; - *download_path = root_path + DPP_PREFIX; - - // Check path exist - std::filesystem::path full_path(*download_path); - - if (!std::filesystem::exists(full_path)) { - LOG(INFO) << "download dir not exist: " << *download_path; - std::error_code error_code; - std::filesystem::create_directories(*download_path, error_code); - - if (error_code) { - status = DORIS_ERROR; - LOG(WARNING) << "create download dir failed.path: " << *download_path - << ", error code: " << error_code; - } - } - - return status; -} - -void Pusher::_get_file_name_from_path(const string& file_path, string* file_name) { - size_t found = file_path.find_last_of("/\\"); - pthread_t tid = pthread_self(); - *file_name = file_path.substr(found + 1) + "_" + boost::lexical_cast(tid); -} - -AgentStatus Pusher::process(vector* tablet_infos) { - AgentStatus status = DORIS_SUCCESS; - // Remote file not empty, need to download - if (_push_req.__isset.http_file_path) { - // Get file length and timeout - uint64_t file_size = 0; - uint64_t estimate_time_out = DEFAULT_DOWNLOAD_TIMEOUT; - if (_push_req.__isset.http_file_size) { - file_size = _push_req.http_file_size; - estimate_time_out = file_size / config::download_low_speed_limit_kbps / 1024; - } - if (estimate_time_out < config::download_low_speed_time) { - estimate_time_out = config::download_low_speed_time; - } - bool is_timeout = false; - auto download_cb = [this, estimate_time_out, file_size, &is_timeout](HttpClient* client) { - // Check timeout and set timeout - time_t now = time(nullptr); - if (_push_req.timeout > 0 && _push_req.timeout < now) { - // return status to break this callback - VLOG_NOTICE << "check time out. time_out:" << _push_req.timeout << ", now:" << now; - is_timeout = true; - return Status::OK(); - } - - RETURN_IF_ERROR(client->init(_remote_file_path)); - // sent timeout - uint64_t timeout = _push_req.timeout > 0 ? _push_req.timeout - now : 0; - if (timeout > 0 && timeout < estimate_time_out) { - client->set_timeout_ms(timeout * 1000); - } else { - client->set_timeout_ms(estimate_time_out * 1000); - } - - // download remote file - RETURN_IF_ERROR(client->download(_local_file_path)); - - // check file size - if (_push_req.__isset.http_file_size) { - // Check file size - uint64_t local_file_size = std::filesystem::file_size(_local_file_path); - if (file_size != local_file_size) { - LOG(WARNING) << "download_file size error. file_size=" << file_size - << ", local_file_size=" << local_file_size; - return Status::InternalError("downloaded file's size isn't right"); - } - } - // NOTE: change http_file_path is not good design - _push_req.http_file_path = _local_file_path; - return Status::OK(); - }; - - MonotonicStopWatch stopwatch; - stopwatch.start(); - auto st = HttpClient::execute_with_retry(MAX_RETRY, 1, download_cb); - auto cost = stopwatch.elapsed_time(); - if (cost <= 0) { - cost = 1; - } - if (st.ok() && !is_timeout) { - double rate = -1.0; - if (_push_req.__isset.http_file_size) { - rate = (double)_push_req.http_file_size / (cost / 1000 / 1000 / 1000) / 1024; - } - LOG(INFO) << "down load file success. local_file=" << _local_file_path - << ", remote_file=" << _remote_file_path << ", tablet_id" - << _push_req.tablet_id << ", cost=" << cost / 1000 << "us, file_size" - << _push_req.http_file_size << ", download rage:" << rate << "KB/s"; - } else { - LOG(WARNING) << "down load file failed. remote_file=" << _remote_file_path - << ", tablet=" << _push_req.tablet_id << ", cost=" << cost / 1000 - << "us, errmsg=" << st.get_error_msg() << ", is_timeout=" << is_timeout; - status = DORIS_ERROR; - } - } - - if (status == DORIS_SUCCESS) { - // Load delta file - time_t push_begin = time(nullptr); - OLAPStatus push_status = _engine->push(_push_req, tablet_infos); - time_t push_finish = time(nullptr); - LOG(INFO) << "Push finish, cost time: " << (push_finish - push_begin); - if (push_status == OLAPStatus::OLAP_ERR_PUSH_TRANSACTION_ALREADY_EXIST) { - status = DORIS_PUSH_HAD_LOADED; - } else if (push_status != OLAPStatus::OLAP_SUCCESS) { - status = DORIS_ERROR; - } - } - - // Delete download file - if (std::filesystem::exists(_local_file_path)) { - if (remove(_local_file_path.c_str()) == -1) { - LOG(WARNING) << "can not remove file=" << _local_file_path; - } - } - - return status; -} - -} // namespace doris diff --git a/be/src/olap/task/engine_batch_load_task.h b/be/src/olap/task/engine_batch_load_task.h index 125dc7fc14..03371d70f9 100644 --- a/be/src/olap/task/engine_batch_load_task.h +++ b/be/src/olap/task/engine_batch_load_task.h @@ -46,7 +46,6 @@ public: virtual OLAPStatus execute(); private: - // The initial function of pusher virtual AgentStatus _init(); // The process of push data to olap engine @@ -77,6 +76,6 @@ private: AgentStatus* _res_status; std::string _remote_file_path; std::string _local_file_path; -}; // class Pusher +}; // class EngineBatchLoadTask } // namespace doris #endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_BATCH_LOAD_TASK_H diff --git a/be/test/agent/mock_pusher.h b/be/test/agent/mock_pusher.h deleted file mode 100644 index 8b95b3524e..0000000000 --- a/be/test/agent/mock_pusher.h +++ /dev/null @@ -1,32 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef DORIS_BE_SRC_AGENT_MOCK_MOCK_PUSHER_H -#define DORIS_BE_SRC_AGENT_MOCK_MOCK_PUSHER_H - -#include "gmock/gmock.h" - -namespace doris { - -class MockPusher : public Pusher { -public: - MockPusher(const TPushReq& push_req); - MOCK_METHOD0(init, AgentStatus()); - MOCK_METHOD1(process, AgentStatus(std::vector* tablet_infos)); -}; // class MockPusher -} // namespace doris -#endif // DORIS_BE_SRC_AGENT_SERVICE_PUSHER_H