From a2b299e3b9e33a4a8f0b16fb62758726cd819cf7 Mon Sep 17 00:00:00 2001 From: Zhao Chun Date: Thu, 15 Nov 2018 16:17:23 +0800 Subject: [PATCH] Reduce UT binary size (#314) * Reduce UT binary size Almost every module depend on ExecEnv, and ExecEnv contains all singleton, which make UT binary contains all object files. This patch seperate ExecEnv's initial and destory to anthor file to avoid other file's dependence. And status.cc include debug_util.h which depend tuple.h tuple_row.h, and I move get_stack_trace() to stack_util.cpp to reduce status.cc's dependence. I add USE_RTTI=1 to build rocksdb to avoid linking librocksdb.a Issue: #292 * Update --- .gitignore | 1 + be/src/agent/heartbeat_server.cpp | 10 +- be/src/agent/heartbeat_server.h | 1 + be/src/agent/user_resource_listener.cpp | 5 +- be/src/agent/user_resource_listener.h | 4 +- be/src/common/config.h | 26 +- be/src/common/daemon.cpp | 2 +- be/src/common/status.cpp | 1 - be/src/common/status.h | 1 + be/src/exec/aggregation_node.cpp | 5 +- be/src/exec/blocking_join_node.cpp | 5 +- be/src/exec/broker_scan_node.cpp | 5 +- be/src/exec/exchange_node.cpp | 3 +- be/src/exec/hash_join_node.cpp | 13 +- be/src/exec/hash_table.cpp | 3 +- .../exec/new_partitioned_aggregation_node.cc | 3 +- be/src/exec/new_partitioned_hash_table.cc | 3 +- be/src/exec/olap_rewrite_node.cpp | 3 +- be/src/exec/olap_scan_node.cpp | 3 +- be/src/exec/olap_scan_node.h | 3 +- be/src/exec/olap_scanner.cpp | 4 +- be/src/exec/olap_table_info.cpp | 5 +- be/src/exec/olap_table_sink.cpp | 4 +- be/src/exec/partitioned_aggregation_node.cc | 5 +- be/src/exec/partitioned_hash_table.cc | 3 +- be/src/exec/row_batch_list.h | 4 +- .../exec/schema_scanner/frontend_helper.cpp | 2 + be/src/exec/schema_scanner/frontend_helper.h | 7 +- be/src/exec/select_node.cpp | 3 +- be/src/exec/topn_node.cpp | 3 +- be/src/http/action/mini_load.cpp | 1 + be/src/olap/file_helper.cpp | 8 +- be/src/olap/file_helper.h | 9 + be/src/olap/olap_engine.cpp | 11 +- be/src/olap/olap_engine.h | 4 - be/src/runtime/CMakeLists.txt | 1 + be/src/runtime/bufferpool/buffer_allocator.cc | 4 +- be/src/runtime/bufferpool/system_allocator.cc | 10 +- be/src/runtime/data_spliter.cpp | 3 +- be/src/runtime/dpp_sink.cpp | 1 + be/src/runtime/etl_job_mgr.cpp | 1 + be/src/runtime/exec_env.cpp | 277 +--------------- be/src/runtime/exec_env.h | 300 +++++++----------- be/src/runtime/exec_env_init.cpp | 215 +++++++++++++ be/src/runtime/export_sink.cpp | 3 +- be/src/runtime/fragment_mgr.cpp | 1 + be/src/runtime/mem_pool.cpp | 4 +- be/src/runtime/mem_pool.h | 3 +- be/src/runtime/plan_fragment_executor.cpp | 2 +- be/src/runtime/row_batch.cpp | 9 + be/src/runtime/row_batch.h | 1 + be/src/runtime/test_env.cc | 2 +- be/src/runtime/tuple.cpp | 41 ++- be/src/runtime/tuple.h | 3 + be/src/runtime/tuple_row.cpp | 17 + be/src/runtime/tuple_row.h | 1 + be/src/service/CMakeLists.txt | 1 + be/src/service/doris_main.cpp | 22 +- be/src/service/http_service.cpp | 119 +++++++ be/src/service/http_service.h | 44 +++ be/src/util/CMakeLists.txt | 1 + be/src/util/debug_util.cpp | 80 ----- be/src/util/debug_util.h | 13 - be/src/util/disk_info.cpp | 2 - be/src/util/mem_info.cpp | 1 - be/src/util/stack_util.cpp | 34 ++ be/src/util/stack_util.h | 29 ++ be/test/exec/olap_table_sink_test.cpp | 56 ++-- be/test/http/stream_load_test.cpp | 48 +-- env.sh | 2 +- thirdparty/build-thirdparty.sh | 40 +-- thirdparty/download-thirdparty.sh | 27 +- 72 files changed, 846 insertions(+), 745 deletions(-) create mode 100644 be/src/runtime/exec_env_init.cpp create mode 100644 be/src/service/http_service.cpp create mode 100644 be/src/service/http_service.h create mode 100644 be/src/util/stack_util.cpp create mode 100644 be/src/util/stack_util.h diff --git a/.gitignore b/.gitignore index 34e503496f..b9bce7bf83 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,4 @@ output docs/build gensrc/build fe/target +thirdparty/src diff --git a/be/src/agent/heartbeat_server.cpp b/be/src/agent/heartbeat_server.cpp index c8d41103cd..ec40b7909e 100644 --- a/be/src/agent/heartbeat_server.cpp +++ b/be/src/agent/heartbeat_server.cpp @@ -18,15 +18,17 @@ #include "agent/heartbeat_server.h" #include #include -#include "boost/filesystem.hpp" -#include "thrift/TProcessor.h" -#include "gen_cpp/HeartbeatService.h" -#include "gen_cpp/Status_types.h" + +#include +#include #include "common/status.h" +#include "gen_cpp/HeartbeatService.h" +#include "gen_cpp/Status_types.h" #include "olap/olap_engine.h" #include "olap/utils.h" #include "service/backend_options.h" +#include "util/thrift_server.h" using std::fstream; using std::nothrow; diff --git a/be/src/agent/heartbeat_server.h b/be/src/agent/heartbeat_server.h index dc78db392d..1a36187ca4 100644 --- a/be/src/agent/heartbeat_server.h +++ b/be/src/agent/heartbeat_server.h @@ -31,6 +31,7 @@ namespace doris { const uint32_t HEARTBEAT_INTERVAL = 10; class OLAPEngine; class Status; +class ThriftServer; class HeartbeatServer : public HeartbeatServiceIf { public: diff --git a/be/src/agent/user_resource_listener.cpp b/be/src/agent/user_resource_listener.cpp index 631cd7c795..cd563f4570 100644 --- a/be/src/agent/user_resource_listener.cpp +++ b/be/src/agent/user_resource_listener.cpp @@ -25,6 +25,7 @@ #include #include "common/logging.h" #include "gen_cpp/FrontendService.h" +#include "runtime/client_cache.h" namespace doris { @@ -38,7 +39,7 @@ using apache::thrift::transport::TTransportException; UserResourceListener::UserResourceListener(ExecEnv* exec_env, const TMasterInfo& master_info) : _master_info(master_info), - _master_client_cache(exec_env->frontend_client_cache()), + _exec_env(exec_env), _cgroups_mgr(*(exec_env->cgroups_mgr())) { } @@ -64,7 +65,7 @@ void UserResourceListener::update_users_resource(int64_t new_version) { // Call fe to get latest user resource Status master_status; // using 500ms as default timeout value - FrontendServiceConnection client(_master_client_cache, + FrontendServiceConnection client(_exec_env->frontend_client_cache(), _master_info.network_address, 500, &master_status); diff --git a/be/src/agent/user_resource_listener.h b/be/src/agent/user_resource_listener.h index 16736d973a..2fb6d97139 100644 --- a/be/src/agent/user_resource_listener.h +++ b/be/src/agent/user_resource_listener.h @@ -28,6 +28,8 @@ namespace doris { +class ExecEnv; + class UserResourceListener : public TopicListener { public: @@ -40,7 +42,7 @@ public: const TTopicUpdate& topic_update); private: const TMasterInfo& _master_info; - FrontendServiceClientCache* _master_client_cache; + ExecEnv* _exec_env; CgroupsMgr& _cgroups_mgr; // Call cgroups mgr to update user's cgroups resource share // Also refresh local user resource's cache diff --git a/be/src/common/config.h b/be/src/common/config.h index 9288635f10..84eae91519 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -337,42 +337,36 @@ namespace config { // cpu count CONF_Int32(flags_num_cores, "32"); - CONF_Bool(FLAGS_thread_creation_fault_injection, "false"); + CONF_Bool(thread_creation_fault_injection, "false"); // Set this to encrypt and perform an integrity // check on all data spilled to disk during a query - CONF_Bool(FLAGS_disk_spill_encryption, "false"); + CONF_Bool(disk_spill_encryption, "false"); // Writable scratch directories - CONF_String(FLAGS_scratch_dirs, "/tmp"); + CONF_String(scratch_dirs, "/tmp"); // If false and --scratch_dirs contains multiple directories on the same device, // then only the first writable directory is used - CONF_Bool(FLAGS_allow_multiple_scratch_dirs_per_device, "false"); + CONF_Bool(allow_multiple_scratch_dirs_per_device, "false"); // linux transparent huge page - CONF_Bool(FLAGS_madvise_huge_pages, "false"); + CONF_Bool(madvise_huge_pages, "false"); // whether use mmap to allocate memory - CONF_Bool(FLAGS_mmap_buffers, "false"); - - // whether or not user mem pool - CONF_Bool(FLAGS_disable_mem_pools, "false"); + CONF_Bool(mmap_buffers, "false"); // max memory can be allocated by buffer pool - CONF_String(FLAGS_buffer_pool_limit, "80G"); + CONF_String(buffer_pool_limit, "80G"); // clean page can be hold by buffer pool - CONF_String(FLAGS_buffer_pool_clean_pages_limit, "20G"); - - // buffer pool can support min memory allocated - CONF_Int32(FLAGS_min_buffer_size, "1024"); + CONF_String(buffer_pool_clean_pages_limit, "20G"); // Sleep time in seconds between memory maintenance iterations - CONF_Int64(FLAGS_memory_maintenance_sleep_time_s, "10"); + CONF_Int64(memory_maintenance_sleep_time_s, "10"); // Aligement - CONF_Int32(FLAGS_MEMORY_MAX_ALIGNMENT, "16"); + CONF_Int32(memory_max_alignment, "16"); // write buffer size before flush CONF_Int32(write_buffer_size, "104857600"); diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index fa5aaa6b4b..53a327bb54 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -81,7 +81,7 @@ void* tcmalloc_gc_thread(void* dummy) { void* memory_maintenance_thread(void* dummy) { while (true) { - sleep(config::FLAGS_memory_maintenance_sleep_time_s); + sleep(config::memory_maintenance_sleep_time_s); ExecEnv* env = ExecEnv::GetInstance(); // ExecEnv may not have been created yet or this may be the catalogd or statestored, // which don't have ExecEnvs. diff --git a/be/src/common/status.cpp b/be/src/common/status.cpp index 33bf2a5ff7..c66a108cfe 100644 --- a/be/src/common/status.cpp +++ b/be/src/common/status.cpp @@ -20,7 +20,6 @@ #include #include "common/logging.h" -#include "util/debug_util.h" namespace doris { diff --git a/be/src/common/status.h b/be/src/common/status.h index ff8c1628e0..040baf8c5e 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -25,6 +25,7 @@ #include "common/compiler_util.h" #include "gen_cpp/Status_types.h" // for TStatus #include "gen_cpp/status.pb.h" // for PStatus +#include "util/stack_util.h" // for PStatus namespace doris { diff --git a/be/src/exec/aggregation_node.cpp b/be/src/exec/aggregation_node.cpp index 214a41712e..1a0e96ddb1 100644 --- a/be/src/exec/aggregation_node.cpp +++ b/be/src/exec/aggregation_node.cpp @@ -40,7 +40,6 @@ #include "runtime/string_value.hpp" #include "runtime/tuple.h" #include "runtime/tuple_row.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" using llvm::BasicBlock; @@ -207,7 +206,7 @@ Status AggregationNode::open(RuntimeState* state) { for (int i = 0; i < batch.num_rows(); ++i) { TupleRow* row = batch.get_row(i); VLOG_ROW << "id=" << id() << " input row: " - << print_row(row, _children[0]->row_desc()); + << row->to_string(_children[0]->row_desc()); } } @@ -291,7 +290,7 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* row->set_tuple(0, output_tuple); if (ExecNode::eval_conjuncts(ctxs, num_ctxs, row)) { - VLOG_ROW << "output row: " << print_row(row, row_desc()); + VLOG_ROW << "output row: " << row->to_string(row_desc()); row_batch->commit_last_row(); ++_num_rows_returned; diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp index 1b406b05f2..af305d42dd 100644 --- a/be/src/exec/blocking_join_node.cpp +++ b/be/src/exec/blocking_join_node.cpp @@ -22,7 +22,6 @@ #include "exprs/expr.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "gen_cpp/PlanNodes_types.h" @@ -191,9 +190,9 @@ std::string BlockingJoinNode::get_left_child_row_string(TupleRow* row) { std::find(_build_tuple_idx_ptr, _build_tuple_idx_ptr + _build_tuple_size, i); if (is_build_tuple != _build_tuple_idx_ptr + _build_tuple_size) { - out << print_tuple(NULL, *row_desc().tuple_descriptors()[i]); + out << Tuple::to_string(NULL, *row_desc().tuple_descriptors()[i]); } else { - out << print_tuple(row->get_tuple(i), *row_desc().tuple_descriptors()[i]); + out << Tuple::to_string(row->get_tuple(i), *row_desc().tuple_descriptors()[i]); } } diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index d6373c4609..cd0b3b746e 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -26,7 +26,6 @@ #include "runtime/dpp_sink_internal.h" #include "exec/broker_scanner.h" #include "exprs/expr.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" namespace doris { @@ -213,7 +212,7 @@ Status BrokerScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* for (int i = 0; i < row_batch->num_rows(); ++i) { TupleRow* row = row_batch->get_row(i); VLOG_ROW << "BrokerScanNode output row: " - << print_tuple(row->get_tuple(0), *_tuple_desc); + << Tuple::to_string(row->get_tuple(0), *_tuple_desc); } } @@ -331,7 +330,7 @@ Status BrokerScanNode::scanner_scan( std::stringstream error_msg; error_msg << "No corresponding partition, partition id: " << partition_id; - _runtime_state->append_error_msg_to_file(print_tuple(tuple, *_tuple_desc), + _runtime_state->append_error_msg_to_file(Tuple::to_string(tuple, *_tuple_desc), error_msg.str()); continue; } diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp index e68e3e2ed7..840c875f9d 100644 --- a/be/src/exec/exchange_node.cpp +++ b/be/src/exec/exchange_node.cpp @@ -24,7 +24,6 @@ #include "runtime/exec_env.h" #include "runtime/runtime_state.h" #include "runtime/row_batch.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "gen_cpp/PlanNodes_types.h" @@ -172,7 +171,7 @@ Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* } if (VLOG_ROW_IS_ON) { - VLOG_ROW << "ExchangeNode output batch: " << print_batch(output_batch); + VLOG_ROW << "ExchangeNode output batch: " << output_batch->to_string(); } COUNTER_SET(_rows_returned_counter, _num_rows_returned); diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp index 8f3e682b43..378f8f5a0a 100644 --- a/be/src/exec/hash_join_node.cpp +++ b/be/src/exec/hash_join_node.cpp @@ -26,7 +26,6 @@ #include "exprs/slot_ref.h" #include "runtime/row_batch.h" #include "runtime/runtime_state.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "gen_cpp/PlanNodes_types.h" @@ -444,7 +443,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo VLOG_ROW << "probe row: " << get_probe_row_output_string(_current_probe_row); while (_hash_tbl_iterator.has_next()) { TupleRow* matched_build_row = _hash_tbl_iterator.get_row(); - VLOG_ROW << "matched_build_row: " << print_row(matched_build_row, child(1)->row_desc()); + VLOG_ROW << "matched_build_row: " << matched_build_row->to_string(child(1)->row_desc()); if ((_join_op == TJoinOp::RIGHT_ANTI_JOIN || _join_op == TJoinOp::RIGHT_SEMI_JOIN) && _hash_tbl_iterator.matched()) { @@ -508,7 +507,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo _hash_tbl_iterator.next(); if (eval_conjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) { out_batch->commit_last_row(); - VLOG_ROW << "match row: " << print_row(out_row, row_desc()); + VLOG_ROW << "match row: " << out_row->to_string(row_desc()); ++_num_rows_returned; COUNTER_SET(_rows_returned_counter, _num_rows_returned); @@ -528,7 +527,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo if (eval_conjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) { out_batch->commit_last_row(); - VLOG_ROW << "match row: " << print_row(out_row, row_desc()); + VLOG_ROW << "match row: " << out_row->to_string(row_desc()); ++_num_rows_returned; COUNTER_SET(_rows_returned_counter, _num_rows_returned); _matched_probe = true; @@ -628,7 +627,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo create_output_row(out_row, NULL, build_row); if (eval_conjuncts(conjunct_ctxs, num_conjunct_ctxs, out_row)) { out_batch->commit_last_row(); - VLOG_ROW << "match row: " << print_row(out_row, row_desc()); + VLOG_ROW << "match row: " << out_row->to_string(row_desc()); ++_num_rows_returned; COUNTER_SET(_rows_returned_counter, _num_rows_returned); @@ -718,9 +717,9 @@ string HashJoinNode::get_probe_row_output_string(TupleRow* probe_row) { std::find(_build_tuple_idx_ptr, _build_tuple_idx_ptr + _build_tuple_size, i); if (is_build_tuple != _build_tuple_idx_ptr + _build_tuple_size) { - out << print_tuple(NULL, *row_desc().tuple_descriptors()[i]); + out << Tuple::to_string(NULL, *row_desc().tuple_descriptors()[i]); } else { - out << print_tuple(probe_row->get_tuple(i), *row_desc().tuple_descriptors()[i]); + out << Tuple::to_string(probe_row->get_tuple(i), *row_desc().tuple_descriptors()[i]); } } diff --git a/be/src/exec/hash_table.cpp b/be/src/exec/hash_table.cpp index 692ef848bf..1625febdeb 100644 --- a/be/src/exec/hash_table.cpp +++ b/be/src/exec/hash_table.cpp @@ -25,7 +25,6 @@ #include "runtime/string_value.hpp" #include "runtime/mem_tracker.h" #include "runtime/runtime_state.h" -#include "util/debug_util.h" #include "util/doris_metrics.h" using llvm::BasicBlock; @@ -313,7 +312,7 @@ std::string HashTable::debug_string(bool skip_empty, const RowDescriptor* desc) if (desc == NULL) { ss << node_idx << "(" << (void*)node->data() << ")"; } else { - ss << (void*)node->data() << " " << print_row(node->data(), *desc); + ss << (void*)node->data() << " " << node->data()->to_string(*desc); } node_idx = node->_next_idx; diff --git a/be/src/exec/new_partitioned_aggregation_node.cc b/be/src/exec/new_partitioned_aggregation_node.cc index 8f4736e82d..da063f92e6 100644 --- a/be/src/exec/new_partitioned_aggregation_node.cc +++ b/be/src/exec/new_partitioned_aggregation_node.cc @@ -44,7 +44,6 @@ #include "runtime/tuple_row.h" #include "runtime/tuple.h" #include "udf/udf_internal.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "gen_cpp/Exprs_types.h" @@ -313,7 +312,7 @@ Status NewPartitionedAggregationNode::open(RuntimeState* state) { if (UNLIKELY(VLOG_ROW_IS_ON)) { for (int i = 0; i < batch.num_rows(); ++i) { TupleRow* row = batch.get_row(i); - VLOG_ROW << "input row: " << print_row(row, _children[0]->row_desc()); + VLOG_ROW << "input row: " << row->to_string(_children[0]->row_desc()); } } diff --git a/be/src/exec/new_partitioned_hash_table.cc b/be/src/exec/new_partitioned_hash_table.cc index 56b06c34b7..948cefa732 100644 --- a/be/src/exec/new_partitioned_hash_table.cc +++ b/be/src/exec/new_partitioned_hash_table.cc @@ -31,7 +31,6 @@ #include "runtime/raw_value.h" #include "runtime/runtime_state.h" #include "runtime/string_value.h" -#include "util/debug_util.h" #include "util/doris_metrics.h" #include "common/names.h" @@ -566,7 +565,7 @@ void NewPartitionedHashTable::DebugStringTuple(std::stringstream& ss, HtData& ht } if (desc != NULL) { Tuple* row[num_build_tuples_]; - ss << " " << print_row(GetRow(htdata, reinterpret_cast(row)), *desc); + ss << " " << GetRow(htdata, reinterpret_cast(row))->to_string(*desc); } } diff --git a/be/src/exec/olap_rewrite_node.cpp b/be/src/exec/olap_rewrite_node.cpp index 44317ec4e7..611edc4979 100644 --- a/be/src/exec/olap_rewrite_node.cpp +++ b/be/src/exec/olap_rewrite_node.cpp @@ -25,7 +25,6 @@ #include "runtime/row_batch.h" #include "runtime/raw_value.h" #include "runtime/tuple.h" -#include "util/debug_util.h" namespace doris { @@ -232,7 +231,7 @@ bool OlapRewriteNode::copy_rows(RuntimeState* state, RowBatch* output_batch) { if (VLOG_ROW_IS_ON) { for (int i = 0; i < output_batch->num_rows(); ++i) { TupleRow* row = output_batch->get_row(i); - VLOG_ROW << "OlapRewriteNode input row: " << print_row(row, row_desc()); + VLOG_ROW << "OlapRewriteNode input row: " << row->to_string(row_desc()); } } diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp index 80cb4e8dbf..cc1897e9ca 100644 --- a/be/src/exec/olap_scan_node.cpp +++ b/be/src/exec/olap_scan_node.cpp @@ -38,6 +38,7 @@ #include "util/runtime_profile.h" #include "util/thread_pool.hpp" #include "util/debug_util.h" +#include "util/priority_thread_pool.hpp" #include "agent/cgroups_mgr.h" #include "common/resource_tls.h" #include @@ -289,7 +290,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo for (int i = 0; i < row_batch->num_rows(); ++i) { TupleRow* row = row_batch->get_row(i); VLOG_ROW << "OlapScanNode output row: " - << print_tuple(row->get_tuple(0), *_tuple_desc); + << Tuple::to_string(row->get_tuple(0), *_tuple_desc); } } __sync_fetch_and_sub(&_buffered_bytes, diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h index ef5f266812..0db834d0a0 100644 --- a/be/src/exec/olap_scan_node.h +++ b/be/src/exec/olap_scan_node.h @@ -35,7 +35,6 @@ #include "runtime/row_batch_interface.hpp" #include "runtime/vectorized_row_batch.h" #include "util/progress_updater.h" -#include "util/debug_util.h" namespace doris { @@ -125,7 +124,7 @@ protected: while (!h.empty()) { HeapType v = h.top(); - s << "\nID: " << v.id << " Value:" << print_tuple(v.tuple, *_tuple_desc); + s << "\nID: " << v.id << " Value:" << Tuple::to_string(v.tuple, *_tuple_desc); h.pop(); } diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp index dda2fefdca..31b92f9b4e 100644 --- a/be/src/exec/olap_scanner.cpp +++ b/be/src/exec/olap_scanner.cpp @@ -260,7 +260,7 @@ Status OlapScanner::get_batch( _convert_row_to_tuple(tuple); if (VLOG_ROW_IS_ON) { - VLOG_ROW << "OlapScanner input row: " << print_tuple(tuple, *_tuple_desc); + VLOG_ROW << "OlapScanner input row: " << Tuple::to_string(tuple, *_tuple_desc); } // 3.4 Set tuple to RowBatch(not commited) @@ -312,7 +312,7 @@ Status OlapScanner::get_batch( } } if (VLOG_ROW_IS_ON) { - VLOG_ROW << "OlapScanner output row: " << print_tuple(tuple, *_tuple_desc); + VLOG_ROW << "OlapScanner output row: " << Tuple::to_string(tuple, *_tuple_desc); } // check direct && pushdown conjuncts success then commit tuple diff --git a/be/src/exec/olap_table_info.cpp b/be/src/exec/olap_table_info.cpp index cc94b54fb3..04e8f9f8f0 100644 --- a/be/src/exec/olap_table_info.cpp +++ b/be/src/exec/olap_table_info.cpp @@ -21,7 +21,6 @@ #include "runtime/mem_tracker.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" -#include "util/debug_util.h" #include "util/string_parser.hpp" namespace doris { @@ -124,8 +123,8 @@ std::string OlapTableSchemaParam::debug_string() const { std::string OlapTablePartition::debug_string(TupleDescriptor* tuple_desc) const { std::stringstream ss; ss << "(id=" << id - << ",start_key=" << print_tuple(start_key, *tuple_desc) - << ",end_key=" << print_tuple(end_key, *tuple_desc) + << ",start_key=" << Tuple::to_string(start_key, *tuple_desc) + << ",end_key=" << Tuple::to_string(end_key, *tuple_desc) << ",num_buckets=" << num_buckets << ",indexes=["; int idx = 0; diff --git a/be/src/exec/olap_table_sink.cpp b/be/src/exec/olap_table_sink.cpp index da6c94cb6a..5bf3fd93fc 100644 --- a/be/src/exec/olap_table_sink.cpp +++ b/be/src/exec/olap_table_sink.cpp @@ -24,7 +24,6 @@ #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/tuple_row.h" -#include "util/debug_util.h" #include "util/brpc_stub_cache.h" #include "util/uid_util.h" @@ -554,7 +553,8 @@ Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) { uint32_t dist_hash = 0; if (!_partition->find_tablet(tuple, &partition, &dist_hash)) { std::stringstream ss; - ss << "no partition for this tuple. tuple=" << print_tuple(tuple, *_output_tuple_desc); + ss << "no partition for this tuple. tuple=" + << Tuple::to_string(tuple, *_output_tuple_desc); #if BE_TEST LOG(INFO) << ss.str(); #else diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc index 506179aeec..dfddb3b85b 100644 --- a/be/src/exec/partitioned_aggregation_node.cc +++ b/be/src/exec/partitioned_aggregation_node.cc @@ -37,7 +37,6 @@ #include "runtime/tuple.h" #include "runtime/tuple_row.h" #include "udf/udf_internal.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "gen_cpp/Exprs_types.h" @@ -240,7 +239,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) { for (int i = 0; i < batch.num_rows(); ++i) { TupleRow* row = batch.get_row(i); VLOG_ROW << "partition-agg-node input row: " - << print_row(row, _children[0]->row_desc()); + << row->to_string(_children[0]->row_desc()); } } @@ -250,7 +249,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) { } else if (_probe_expr_ctxs.empty()) { RETURN_IF_ERROR(process_batch_no_grouping(&batch)); } else { - // VLOG_ROW << "partition-agg-node batch: " << print_batch(&batch); + // VLOG_ROW << "partition-agg-node batch: " << batch->to_string(); // There is grouping, so we will do partitioned aggregation. RETURN_IF_ERROR(process_batch(&batch, _ht_ctx.get())); } diff --git a/be/src/exec/partitioned_hash_table.cc b/be/src/exec/partitioned_hash_table.cc index 8798bc9738..e14bdd6b09 100644 --- a/be/src/exec/partitioned_hash_table.cc +++ b/be/src/exec/partitioned_hash_table.cc @@ -27,7 +27,6 @@ #include "runtime/raw_value.h" #include "runtime/runtime_state.h" #include "runtime/string_value.hpp" -#include "util/debug_util.h" #include "util/doris_metrics.h" // using namespace llvm; @@ -391,7 +390,7 @@ void PartitionedHashTable::debug_string_tuple( } if (desc != NULL) { Tuple* row[_num_build_tuples]; - ss << " " << print_row(get_row(htdata, reinterpret_cast(row)), *desc); + ss << " " << get_row(htdata, reinterpret_cast(row))->to_string(*desc); } } diff --git a/be/src/exec/row_batch_list.h b/be/src/exec/row_batch_list.h index 50f5815b0e..1fb2597e2c 100644 --- a/be/src/exec/row_batch_list.h +++ b/be/src/exec/row_batch_list.h @@ -23,7 +23,7 @@ #include "common/logging.h" #include "runtime/row_batch.h" -#include "util/debug_util.h" +#include "runtime/tuple_row.h" namespace doris { @@ -108,7 +108,7 @@ public: RowBatchList::TupleRowIterator it = iterator(); while (!it.at_end()) { - out << " " << print_row(it.get_row(), desc); + out << " " << it.get_row()->to_string(desc); it.next(); } diff --git a/be/src/exec/schema_scanner/frontend_helper.cpp b/be/src/exec/schema_scanner/frontend_helper.cpp index 5826b9f309..aa4c7693f8 100644 --- a/be/src/exec/schema_scanner/frontend_helper.cpp +++ b/be/src/exec/schema_scanner/frontend_helper.cpp @@ -29,6 +29,7 @@ #include "gen_cpp/FrontendService_types.h" #include "gen_cpp/FrontendService.h" #include "runtime/runtime_state.h" +#include "runtime/exec_env.h" #include "runtime/row_batch.h" #include "runtime/string_value.h" #include "runtime/tuple_row.h" @@ -37,6 +38,7 @@ #include "util/network_util.h" #include "util/thrift_util.h" #include "util/runtime_profile.h" +#include "runtime/client_cache.h" namespace doris { diff --git a/be/src/exec/schema_scanner/frontend_helper.h b/be/src/exec/schema_scanner/frontend_helper.h index 5a4a4bcacf..3f86bde2e8 100644 --- a/be/src/exec/schema_scanner/frontend_helper.h +++ b/be/src/exec/schema_scanner/frontend_helper.h @@ -19,11 +19,14 @@ #define DORIS_BE_SRC_QUERY_EXEC_SCHEMA_SCANNER_FRONTEND_HELPER_H #include "common/status.h" -#include "runtime/exec_env.h" #include "gen_cpp/FrontendService_types.h" namespace doris { +class ExecEnv; +class FrontendServiceClient; +template class ClientConnection; + // this class is a helper for jni call. easy for unit test class FrontendHelper { public: @@ -59,7 +62,7 @@ public: static Status rpc( const std::string& ip, const int32_t port, - std::function callback, + std::function&)> callback, int timeout_ms = 5000); private: static ExecEnv* _s_exec_env; diff --git a/be/src/exec/select_node.cpp b/be/src/exec/select_node.cpp index ef634db00f..b962e5e3d3 100644 --- a/be/src/exec/select_node.cpp +++ b/be/src/exec/select_node.cpp @@ -21,7 +21,6 @@ #include "runtime/row_batch.h" #include "runtime/runtime_state.h" #include "runtime/raw_value.h" -#include "util/debug_util.h" namespace doris { @@ -125,7 +124,7 @@ bool SelectNode::copy_rows(RowBatch* output_batch) { if (VLOG_ROW_IS_ON) { for (int i = 0; i < output_batch->num_rows(); ++i) { TupleRow* row = output_batch->get_row(i); - VLOG_ROW << "SelectNode input row: " << print_row(row, row_desc()); + VLOG_ROW << "SelectNode input row: " << row->to_string(row_desc()); } } diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp index 1fbb96a48e..4f24a8d706 100644 --- a/be/src/exec/topn_node.cpp +++ b/be/src/exec/topn_node.cpp @@ -29,7 +29,6 @@ #include "runtime/runtime_state.h" #include "runtime/tuple.h" #include "runtime/tuple_row.h" -#include "util/debug_util.h" #include "util/runtime_profile.h" #include "util/tuple_row_compare.h" #include @@ -164,7 +163,7 @@ Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) { COUNTER_SET(_rows_returned_counter, _num_rows_returned); } if (VLOG_ROW_IS_ON) { - VLOG_ROW << "TOPN-node output row: " << print_batch(row_batch); + VLOG_ROW << "TOPN-node output row: " << row_batch->to_string(); } *eos = _get_next_iter == _sorted_top_n.end(); diff --git a/be/src/http/action/mini_load.cpp b/be/src/http/action/mini_load.cpp index 0ac1bec8ca..819589a952 100644 --- a/be/src/http/action/mini_load.cpp +++ b/be/src/http/action/mini_load.cpp @@ -51,6 +51,7 @@ #include "runtime/exec_env.h" #include "runtime/fragment_mgr.h" #include "runtime/load_path_mgr.h" +#include "runtime/client_cache.h" #include "gen_cpp/MasterService_types.h" #include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/FrontendService.h" diff --git a/be/src/olap/file_helper.cpp b/be/src/olap/file_helper.cpp index 2ca937e847..8d536043d5 100644 --- a/be/src/olap/file_helper.cpp +++ b/be/src/olap/file_helper.cpp @@ -27,7 +27,6 @@ #include "olap/olap_common.h" #include "olap/olap_define.h" -#include "olap/olap_engine.h" #include "olap/utils.h" #include "util/debug_util.h" @@ -35,6 +34,8 @@ using std::string; namespace doris { +Cache* FileHandler::_s_fd_cache; + FileHandler::FileHandler() : _fd(-1), _wr_length(0), @@ -85,7 +86,7 @@ OLAPStatus FileHandler::open_with_cache(const string& file_name, int flag) { } CacheKey key(file_name.c_str(), file_name.size()); - Cache* fd_cache = OLAPEngine::get_instance()->file_descriptor_lru_cache(); + Cache* fd_cache = get_fd_cache(); _cache_handle = fd_cache->lookup(key); if (NULL != _cache_handle) { FileDescriptor* file_desc = @@ -146,8 +147,7 @@ OLAPStatus FileHandler::open_with_mode(const string& file_name, int flag, int mo } OLAPStatus FileHandler::release() { - Cache* fd_cache = OLAPEngine::get_instance()->file_descriptor_lru_cache(); - fd_cache->release(_cache_handle); + get_fd_cache()->release(_cache_handle); _cache_handle = NULL; _is_using_cache = false; return OLAP_SUCCESS; diff --git a/be/src/olap/file_helper.h b/be/src/olap/file_helper.h index 0f3a8f841d..2f4279c09a 100644 --- a/be/src/olap/file_helper.h +++ b/be/src/olap/file_helper.h @@ -101,7 +101,16 @@ public: SAFE_DELETE(file_desc); } + static Cache* get_fd_cache() { + return _s_fd_cache; + } + static void set_fd_cache(Cache* cache) { + _s_fd_cache = cache; + } + private: + static Cache* _s_fd_cache; + int _fd; off_t _wr_length; const int64_t _cache_threshold = 1<<19; diff --git a/be/src/olap/olap_engine.cpp b/be/src/olap/olap_engine.cpp index 830bc571a1..0b40a93be1 100644 --- a/be/src/olap/olap_engine.cpp +++ b/be/src/olap/olap_engine.cpp @@ -115,7 +115,6 @@ OLAPEngine::OLAPEngine(const EngineOptions& options) _is_all_cluster_id_exist(true), _is_drop_tables(false), _global_table_id(0), - _file_descriptor_lru_cache(NULL), _index_stream_lru_cache(NULL), _tablet_stat_cache_update_time_ms(0), _snapshot_base_id(0) { @@ -303,12 +302,13 @@ OLAPStatus OLAPEngine::open() { _update_storage_medium_type_count(); - _file_descriptor_lru_cache = new_lru_cache(config::file_descriptor_cache_capacity); - if (_file_descriptor_lru_cache == NULL) { + auto cache = new_lru_cache(config::file_descriptor_cache_capacity); + if (cache == nullptr) { OLAP_LOG_WARNING("failed to init file descriptor LRUCache"); _tablet_map.clear(); return OLAP_ERR_INIT_FAILED; } + FileHandler::set_fd_cache(cache); // 初始化LRUCache // cache大小可通过配置文件配置 @@ -605,7 +605,8 @@ OLAPStatus OLAPEngine::_get_root_path_capacity( OLAPStatus OLAPEngine::clear() { // 删除lru中所有内容,其实进程退出这么做本身意义不大,但对单测和更容易发现问题还是有很大意义的 - SAFE_DELETE(_file_descriptor_lru_cache); + delete FileHandler::get_fd_cache(); + FileHandler::set_fd_cache(nullptr); SAFE_DELETE(_index_stream_lru_cache); _tablet_map.clear(); @@ -1651,7 +1652,7 @@ bool OLAPEngine::_can_do_compaction(OLAPTablePtr table) { void OLAPEngine::start_clean_fd_cache() { OLAP_LOG_TRACE("start clean file descritpor cache"); - _file_descriptor_lru_cache->prune(); + FileHandler::get_fd_cache()->prune(); OLAP_LOG_TRACE("end clean file descritpor cache"); } diff --git a/be/src/olap/olap_engine.h b/be/src/olap/olap_engine.h index 1cd4e704b9..15a8b904d6 100644 --- a/be/src/olap/olap_engine.h +++ b/be/src/olap/olap_engine.h @@ -197,10 +197,6 @@ public: return _index_stream_lru_cache; } - Cache* file_descriptor_lru_cache() { - return _file_descriptor_lru_cache; - } - // 清理trash和snapshot文件,返回清理后的磁盘使用量 OLAPStatus start_trash_sweep(double *usage); diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 9eabd66757..5dcf63bb40 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -36,6 +36,7 @@ add_library(Runtime STATIC datetime_value.cpp descriptors.cpp exec_env.cpp + exec_env_init.cpp lib_cache.cpp mem_pool.cpp plan_fragment_executor.cpp diff --git a/be/src/runtime/bufferpool/buffer_allocator.cc b/be/src/runtime/bufferpool/buffer_allocator.cc index 0e421263f1..2e07a4bf4c 100644 --- a/be/src/runtime/bufferpool/buffer_allocator.cc +++ b/be/src/runtime/bufferpool/buffer_allocator.cc @@ -492,7 +492,7 @@ BufferPool::FreeBufferArena::~FreeBufferArena() { void BufferPool::FreeBufferArena::AddFreeBuffer(BufferHandle&& buffer) { lock_guard al(lock_); - if (config::FLAGS_disable_mem_pools) { + if (config::disable_mem_pools) { int64_t len = buffer.len(); parent_->system_allocator_->Free(move(buffer)); parent_->system_bytes_remaining_.add(len); @@ -635,7 +635,7 @@ std::pair BufferPool::FreeBufferArena::FreeSystemMemory( } void BufferPool::FreeBufferArena::AddCleanPage(Page* page) { - bool eviction_needed = config::FLAGS_disable_mem_pools + bool eviction_needed = config::disable_mem_pools || DecreaseBytesRemaining( page->len, true, &parent_->clean_page_bytes_remaining_) == 0; lock_guard al(lock_); diff --git a/be/src/runtime/bufferpool/system_allocator.cc b/be/src/runtime/bufferpool/system_allocator.cc index 8fe4cf59ba..0b79dc66b6 100644 --- a/be/src/runtime/bufferpool/system_allocator.cc +++ b/be/src/runtime/bufferpool/system_allocator.cc @@ -63,7 +63,7 @@ Status SystemAllocator::Allocate(int64_t len, BufferPool::BufferHandle* buffer) DCHECK(BitUtil::IsPowerOf2(len)) << len; uint8_t* buffer_mem; - if (config::FLAGS_mmap_buffers) { + if (config::mmap_buffers) { RETURN_IF_ERROR(AllocateViaMMap(len, &buffer_mem)); } else { RETURN_IF_ERROR(AllocateViaMalloc(len, &buffer_mem)); @@ -74,7 +74,7 @@ Status SystemAllocator::Allocate(int64_t len, BufferPool::BufferHandle* buffer) Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) { int64_t map_len = len; - bool use_huge_pages = len % HUGE_PAGE_SIZE == 0 && config::FLAGS_madvise_huge_pages; + bool use_huge_pages = len % HUGE_PAGE_SIZE == 0 && config::madvise_huge_pages; if (use_huge_pages) { // Map an extra huge page so we can fix up the alignment if needed. map_len += HUGE_PAGE_SIZE; @@ -116,7 +116,7 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) { } Status SystemAllocator::AllocateViaMalloc(int64_t len, uint8_t** buffer_mem) { - bool use_huge_pages = len % HUGE_PAGE_SIZE == 0 && config::FLAGS_madvise_huge_pages; + bool use_huge_pages = len % HUGE_PAGE_SIZE == 0 && config::madvise_huge_pages; // Allocate, aligned to the page size that we expect to back the memory range. // This ensures that it can be backed by a whole pages, rather than parts of pages. size_t alignment = use_huge_pages ? HUGE_PAGE_SIZE : SMALL_PAGE_SIZE; @@ -144,11 +144,11 @@ Status SystemAllocator::AllocateViaMalloc(int64_t len, uint8_t** buffer_mem) { } void SystemAllocator::Free(BufferPool::BufferHandle&& buffer) { - if (config::FLAGS_mmap_buffers) { + if (config::mmap_buffers) { int rc = munmap(buffer.data(), buffer.len()); DCHECK_EQ(rc, 0) << "Unexpected munmap() error: " << errno; } else { - bool use_huge_pages = buffer.len() % HUGE_PAGE_SIZE == 0 && config::FLAGS_madvise_huge_pages; + bool use_huge_pages = buffer.len() % HUGE_PAGE_SIZE == 0 && config::madvise_huge_pages; if (use_huge_pages) { // Undo the madvise so that is isn't a candidate to be newly backed by huge pages. // We depend on TCMalloc's "aggressive decommit" mode decommitting the physical diff --git a/be/src/runtime/data_spliter.cpp b/be/src/runtime/data_spliter.cpp index 08158fba70..eb12836705 100644 --- a/be/src/runtime/data_spliter.cpp +++ b/be/src/runtime/data_spliter.cpp @@ -32,7 +32,6 @@ #include "runtime/load_path_mgr.h" #include "runtime/mem_tracker.h" #include "util/runtime_profile.h" -#include "util/debug_util.h" #include "util/file_utils.h" #include "gen_cpp/DataSinks_types.h" @@ -240,7 +239,7 @@ Status DataSpliter::process_one_row(RuntimeState* state, TupleRow* row) { state->set_normal_row_number(state->get_normal_row_number() - 1); state->append_error_msg_to_file( - print_row(row, _row_desc), + row->to_string(_row_desc), status.get_error_msg()); return Status::OK; } diff --git a/be/src/runtime/dpp_sink.cpp b/be/src/runtime/dpp_sink.cpp index 47b7c14929..0604d12888 100644 --- a/be/src/runtime/dpp_sink.cpp +++ b/be/src/runtime/dpp_sink.cpp @@ -35,6 +35,7 @@ #include "gen_cpp/Types_types.h" #include "util/count_down_latch.hpp" #include "util/debug_util.h" +#include "util/thread_pool.hpp" #include "olap/field.h" namespace doris { diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp index c28dd07231..4c3321a415 100644 --- a/be/src/runtime/etl_job_mgr.cpp +++ b/be/src/runtime/etl_job_mgr.cpp @@ -29,6 +29,7 @@ #include "runtime/fragment_mgr.h" #include "runtime/data_spliter.h" #include "runtime/runtime_state.h" +#include "runtime/client_cache.h" #include "util/file_utils.h" #include "gen_cpp/MasterService_types.h" #include "gen_cpp/HeartbeatService_types.h" diff --git a/be/src/runtime/exec_env.cpp b/be/src/runtime/exec_env.cpp index 652887bc34..8eea2c9afc 100644 --- a/be/src/runtime/exec_env.cpp +++ b/be/src/runtime/exec_env.cpp @@ -17,291 +17,18 @@ #include "runtime/exec_env.h" -#include - -#include - -#include "common/logging.h" -#include "runtime/broker_mgr.h" -#include "runtime/bufferpool/buffer_pool.h" -#include "runtime/client_cache.h" -#include "runtime/data_stream_mgr.h" -#include "runtime/disk_io_mgr.h" -#include "runtime/result_buffer_mgr.h" -#include "runtime/mem_tracker.h" -#include "runtime/thread_resource_mgr.h" -#include "runtime/fragment_mgr.h" -#include "runtime/tablet_writer_mgr.h" -#include "runtime/tmp_file_mgr.h" -#include "runtime/bufferpool/reservation_tracker.h" -#include "util/metrics.h" -#include "util/network_util.h" -#include "http/web_page_handler.h" -#include "http/default_path_handlers.h" -#include "util/parse_util.h" -#include "util/mem_info.h" -#include "util/debug_util.h" -#include "http/ev_http_server.h" -#include "http/action/mini_load.h" -#include "http/action/checksum_action.h" -#include "http/action/health_action.h" -#include "http/action/reload_tablet_action.h" -#include "http/action/restore_tablet_action.h" -#include "http/action/snapshot_action.h" -#include "http/action/pprof_actions.h" -#include "http/action/metrics_action.h" -#include "http/action/meta_action.h" -#include "http/action/stream_load.h" -#include "http/download_action.h" -#include "http/monitor_action.h" -#include "http/http_method.h" -#include "olap/olap_engine.h" -#include "util/network_util.h" -#include "util/bfd_parser.h" -#include "runtime/etl_job_mgr.h" -#include "runtime/load_path_mgr.h" -#include "runtime/load_stream_mgr.h" -#include "runtime/pull_load_task_mgr.h" -#include "runtime/snapshot_loader.h" -#include "util/pretty_printer.h" -#include "util/doris_metrics.h" -#include "util/brpc_stub_cache.h" -#include "gen_cpp/BackendService.h" -#include "gen_cpp/FrontendService.h" -#include "gen_cpp/TPaloBrokerService.h" #include "gen_cpp/HeartbeatService_types.h" namespace doris { -ExecEnv* ExecEnv::_exec_env = nullptr; - -ExecEnv::ExecEnv() - : _thread_mgr(new ThreadResourceMgr), - _master_info(new TMasterInfo()), - _load_stream_mgr(new LoadStreamMgr()), - _brpc_stub_cache(new BrpcStubCache()) { +ExecEnv::ExecEnv() { } -ExecEnv::ExecEnv(const std::vector& paths) : - _store_paths(paths), - _stream_mgr(new DataStreamMgr()), - _result_mgr(new ResultBufferMgr()), - _client_cache(new BackendServiceClientCache()), - _frontend_client_cache(new FrontendServiceClientCache()), - _broker_client_cache(new BrokerServiceClientCache()), - _ev_http_server(new EvHttpServer(config::webserver_port, config::webserver_num_workers)), - _web_page_handler(new WebPageHandler(_ev_http_server.get())), - _mem_tracker(NULL), - _pool_mem_trackers(new PoolMemTrackerRegistry), - _thread_mgr(new ThreadResourceMgr), - _thread_pool(new PriorityThreadPool( - config::doris_scanner_thread_pool_thread_num, - config::doris_scanner_thread_pool_queue_size)), - _etl_thread_pool(new ThreadPool( - config::etl_thread_pool_size, - config::etl_thread_pool_queue_size)), - _cgroups_mgr(new CgroupsMgr(this, config::doris_cgroups)), - _fragment_mgr(new FragmentMgr(this)), - _master_info(new TMasterInfo()), - _etl_job_mgr(new EtlJobMgr(this)), - _load_path_mgr(new LoadPathMgr(this)), - _disk_io_mgr(new DiskIoMgr()), - _tmp_file_mgr(new TmpFileMgr(this)), - _bfd_parser(BfdParser::create()), - _pull_load_task_mgr(new PullLoadTaskMgr(config::pull_load_task_dir)), - _broker_mgr(new BrokerMgr(this)), - _tablet_writer_mgr(new TabletWriterMgr(this)), - _load_stream_mgr(new LoadStreamMgr()), - _snapshot_loader(new SnapshotLoader(this)), - _brpc_stub_cache(new BrpcStubCache()), - _enable_webserver(true), - _tz_database(TimezoneDatabase()) { - _client_cache->init_metrics(DorisMetrics::metrics(), "backend"); - _frontend_client_cache->init_metrics(DorisMetrics::metrics(), "frontend"); - _broker_client_cache->init_metrics(DorisMetrics::metrics(), "broker"); - _result_mgr->init(); - _cgroups_mgr->init_cgroups(); - _etl_job_mgr->init(); - Status status = _load_path_mgr->init(); - if (!status.ok()) { - LOG(ERROR) << "load path mgr init failed." << status.get_error_msg(); - exit(-1); - } - status = _pull_load_task_mgr->init(); - if (!status.ok()) { - LOG(ERROR) << "pull load task manager init failed." << status.get_error_msg(); - exit(-1); - } - _broker_mgr->init(); - _exec_env = this; -} - -ExecEnv::~ExecEnv() {} - -Status ExecEnv::init_for_tests() { - _mem_tracker.reset(new MemTracker(-1)); - return Status::OK; -} - -Status ExecEnv::start_services() { - LOG(INFO) << "Starting global services"; - - // Initialize global memory limit. - int64_t bytes_limit = 0; - bool is_percent = false; - // --mem_limit="" means no memory limit - bytes_limit = ParseUtil::parse_mem_spec(config::mem_limit, &is_percent); - - if (bytes_limit < 0) { - return Status("Failed to parse mem limit from '" + config::mem_limit + "'."); - } - - std::stringstream ss; - if (!BitUtil::IsPowerOf2(config::FLAGS_min_buffer_size)) { - ss << "--min_buffer_size must be a power-of-two: " << config::FLAGS_min_buffer_size; - return Status(ss.str()); - } - - int64_t buffer_pool_limit = ParseUtil::parse_mem_spec(config::FLAGS_buffer_pool_limit, - &is_percent); - if (buffer_pool_limit <= 0) { - ss << "Invalid --buffer_pool_limit value, must be a percentage or " - "positive bytes value or percentage: " << config::FLAGS_buffer_pool_limit; - return Status(ss.str()); - } - buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, config::FLAGS_min_buffer_size); - - int64_t clean_pages_limit = ParseUtil::parse_mem_spec(config::FLAGS_buffer_pool_clean_pages_limit, - &is_percent); - if (clean_pages_limit <= 0) { - ss << "Invalid --buffer_pool_clean_pages_limit value, must be a percentage or " - "positive bytes value or percentage: " << config::FLAGS_buffer_pool_clean_pages_limit; - return Status(ss.str()); - } - - init_buffer_pool(config::FLAGS_min_buffer_size, buffer_pool_limit, clean_pages_limit); - // Limit of 0 means no memory limit. - if (bytes_limit > 0) { - _mem_tracker.reset(new MemTracker(bytes_limit)); - } - - if (bytes_limit > MemInfo::physical_mem()) { - LOG(WARNING) << "Memory limit " - << PrettyPrinter::print(bytes_limit, TUnit::BYTES) - << " exceeds physical memory of " - << PrettyPrinter::print(MemInfo::physical_mem(), - TUnit::BYTES); - } - - LOG(INFO) << "Using global memory limit: " - << PrettyPrinter::print(bytes_limit, TUnit::BYTES); - - RETURN_IF_ERROR(_disk_io_mgr->init(_mem_tracker.get())); - - // Start services in order to ensure that dependencies between them are met - if (_enable_webserver) { - RETURN_IF_ERROR(start_webserver()); - } else { - LOG(INFO) << "Webserver is disabled"; - } - - RETURN_IF_ERROR(_tmp_file_mgr->init(DorisMetrics::metrics())); - - return Status::OK; -} - -Status ExecEnv::start_webserver() { - add_default_path_handlers(_web_page_handler.get(), _mem_tracker.get()); - _ev_http_server->register_handler(HttpMethod::PUT, - "/api/{db}/{table}/_load", - new MiniLoadAction(this)); - _ev_http_server->register_handler(HttpMethod::PUT, - "/api/{db}/{table}/_stream_load", - new StreamLoadAction(this)); - - std::vector allow_paths; - for (auto& path : _store_paths) { - allow_paths.emplace_back(path.path); - } - DownloadAction* download_action = new DownloadAction(this, allow_paths); - // = new DownloadAction(this, config::mini_load_download_path); - _ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load", download_action); - _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action); - - DownloadAction* tablet_download_action = new DownloadAction(this, allow_paths); - _ev_http_server->register_handler(HttpMethod::HEAD, - "/api/_tablet/_download", - tablet_download_action); - _ev_http_server->register_handler(HttpMethod::GET, - "/api/_tablet/_download", - tablet_download_action); - - DownloadAction* error_log_download_action = new DownloadAction( - this, _load_path_mgr->get_load_error_file_dir()); - _ev_http_server->register_handler( - HttpMethod::GET, "/api/_load_error_log", error_log_download_action); - _ev_http_server->register_handler( - HttpMethod::HEAD, "/api/_load_error_log", error_log_download_action); - - // Register monitor - MonitorAction* monitor_action = new MonitorAction(); - monitor_action->register_module("etl_mgr", etl_job_mgr()); - monitor_action->register_module("fragment_mgr", fragment_mgr()); - _ev_http_server->register_handler(HttpMethod::GET, "/_monitor/{module}", monitor_action); - - // Register BE health action - HealthAction* health_action = new HealthAction(this); - _ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action); - - // register pprof actions - PprofActions::setup(this, _ev_http_server.get()); - - { - auto action = _object_pool.add(new MetricsAction(DorisMetrics::metrics())); - _ev_http_server->register_handler(HttpMethod::GET, "/metrics", action); - } - - MetaAction* meta_action = new MetaAction(HEADER); - _ev_http_server->register_handler(HttpMethod::GET, "/api/meta/header/{tablet_id}/{schema_hash}", meta_action); - -#ifndef BE_TEST - // Register BE checksum action - ChecksumAction* checksum_action = new ChecksumAction(this); - _ev_http_server->register_handler(HttpMethod::GET, "/api/checksum", checksum_action); - - // Register BE reload tablet action - ReloadTabletAction* reload_tablet_action = new ReloadTabletAction(this); - _ev_http_server->register_handler(HttpMethod::GET, "/api/reload_tablet", reload_tablet_action); - - RestoreTabletAction* restore_tablet_action = new RestoreTabletAction(this); - _ev_http_server->register_handler(HttpMethod::POST, "/api/restore_tablet", restore_tablet_action); - - // Register BE snapshot action - SnapshotAction* snapshot_action = new SnapshotAction(this); - _ev_http_server->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action); -#endif - - RETURN_IF_ERROR(_ev_http_server->start()); - return Status::OK; -} - -uint32_t ExecEnv::cluster_id() { - return OLAPEngine::get_instance()->effective_cluster_id(); -} - -void ExecEnv::init_buffer_pool(int64_t min_page_size, int64_t capacity, int64_t clean_pages_limit) { - DCHECK(_buffer_pool == nullptr); - _buffer_pool.reset(new BufferPool(min_page_size, capacity, clean_pages_limit)); - _buffer_reservation.reset(new ReservationTracker); - _buffer_reservation->InitRootTracker(nullptr, capacity); +ExecEnv::~ExecEnv() { } const std::string& ExecEnv::token() const { return _master_info->token; } -MetricRegistry* ExecEnv::metrics() const { - return DorisMetrics::metrics(); -} - } diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index dd3884b2d9..9d29551289 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -18,49 +18,44 @@ #ifndef DORIS_BE_RUNTIME_EXEC_ENV_H #define DORIS_BE_RUNTIME_EXEC_ENV_H -#include -#include -#include - -#include "agent/cgroups_mgr.h" #include "common/status.h" -#include "common/object_pool.h" -#include "exprs/timestamp_functions.h" -#include "runtime/client_cache.h" -#include "runtime/lib_cache.h" -#include "util/thread_pool.hpp" -#include "util/priority_thread_pool.hpp" -#include "util/thread_pool.hpp" #include "olap/options.h" namespace doris { -class DataStreamMgr; -class ResultBufferMgr; -class TestExecEnv; -class EvHttpServer; -class WebPageHandler; -class MemTracker; -class PoolMemTrackerRegistry; -class ThreadResourceMgr; -class FragmentMgr; -class TMasterInfo; -class EtlJobMgr; -class LoadPathMgr; -class DiskIoMgr; -class TmpFileMgr; class BfdParser; -class PullLoadTaskMgr; class BrokerMgr; -class MetricRegistry; -class BufferPool; -class ReservationTracker; -class TabletWriterMgr; -class LoadStreamMgr; -class ConnectionManager; -class SnapshotLoader; class BrpcStubCache; +class BufferPool; +class CgroupsMgr; +class DataStreamMgr; +class DiskIoMgr; +class EtlJobMgr; +class EvHttpServer; +class FragmentMgr; +class LoadPathMgr; +class LoadStreamMgr; +class MemTracker; +class MetricRegistry; class OLAPEngine; +class PoolMemTrackerRegistry; +class PriorityThreadPool; +class PullLoadTaskMgr; +class ReservationTracker; +class ResultBufferMgr; +class SnapshotLoader; +class TMasterInfo; +class TabletWriterMgr; +class TestExecEnv; +class ThreadPool; +class ThreadResourceMgr; +class TmpFileMgr; +class WebPageHandler; + +class BackendServiceClient; +class FrontendServiceClient; +class TPaloBrokerServiceClient; +template class ClientCache; // Execution environment for queries/plan fragments. // Contains all required global structures, and handles to @@ -68,184 +63,101 @@ class OLAPEngine; // once to properly initialise service state. class ExecEnv { public: - ExecEnv(const std::vector& store_paths); + // Initial exec enviorment. must call this to init all + static Status init(ExecEnv* env, const std::vector& store_paths); + static void destroy(ExecEnv* exec_env); + + /// Returns the first created exec env instance. In a normal doris, this is + /// the only instance. In test setups with multiple ExecEnv's per process, + /// we return the most recently created instance. + static ExecEnv* GetInstance() { + static ExecEnv s_exec_env; + return &s_exec_env; + } // only used for test ExecEnv(); - /// Returns the first created exec env instance. In a normal impalad, this is - /// the only instance. In test setups with multiple ExecEnv's per process, - /// we return the most recently created instance. - static ExecEnv* GetInstance() { return _exec_env; } - // Empty destructor because the compiler-generated one requires full // declarations for classes in scoped_ptrs. - virtual ~ExecEnv(); - - uint32_t cluster_id(); + ~ExecEnv(); const std::string& token() const; - - MetricRegistry* metrics() const; - - DataStreamMgr* stream_mgr() { - return _stream_mgr.get(); - } - ResultBufferMgr* result_mgr() { - return _result_mgr.get(); - } - BackendServiceClientCache* client_cache() { - return _client_cache.get(); - } - FrontendServiceClientCache* frontend_client_cache() { - return _frontend_client_cache.get(); - } - BrokerServiceClientCache* broker_client_cache() { - return _broker_client_cache.get(); - } - WebPageHandler* web_page_handler() { - return _web_page_handler.get(); - } - MemTracker* process_mem_tracker() { - return _mem_tracker.get(); - } - PoolMemTrackerRegistry* pool_mem_trackers() { - return _pool_mem_trackers.get(); - } - ThreadResourceMgr* thread_mgr() { - return _thread_mgr.get(); - } - PriorityThreadPool* thread_pool() { - return _thread_pool.get(); - } - ThreadPool* etl_thread_pool() { - return _etl_thread_pool.get(); - } - CgroupsMgr* cgroups_mgr() { - return _cgroups_mgr.get(); - } - FragmentMgr* fragment_mgr() { - return _fragment_mgr.get(); - } - TMasterInfo* master_info() { - return _master_info.get(); - } - EtlJobMgr* etl_job_mgr() { - return _etl_job_mgr.get(); - } - LoadPathMgr* load_path_mgr() { - return _load_path_mgr.get(); - } - DiskIoMgr* disk_io_mgr() { - return _disk_io_mgr.get(); - } - TmpFileMgr* tmp_file_mgr() { - return _tmp_file_mgr.get(); - } - - BfdParser* bfd_parser() const { - return _bfd_parser.get(); - } - - PullLoadTaskMgr* pull_load_task_mgr() const { - return _pull_load_task_mgr.get(); - } - - BrokerMgr* broker_mgr() const { - return _broker_mgr.get(); - } - - SnapshotLoader* snapshot_loader() const { - return _snapshot_loader.get(); - } - - BrpcStubCache* brpc_stub_cache() const { - return _brpc_stub_cache.get(); - } - - void set_enable_webserver(bool enable) { - _enable_webserver = enable; - } - - // Starts any dependent services in their correct order - virtual Status start_services(); - - // Initializes the exec env for running FE tests. - Status init_for_tests(); - - ReservationTracker* buffer_reservation() { - return _buffer_reservation.get(); - } - - BufferPool* buffer_pool() { - return _buffer_pool.get(); - } - - TabletWriterMgr* tablet_writer_mgr() { - return _tablet_writer_mgr.get(); - } - - LoadStreamMgr* load_stream_mgr() { - return _load_stream_mgr.get(); - } - - const std::vector& store_paths() const { - return _store_paths; - } - - void set_store_paths(const std::vector& paths) { - _store_paths = paths; - } - + MetricRegistry* metrics() const { return _metrics; } + DataStreamMgr* stream_mgr() { return _stream_mgr; } + ResultBufferMgr* result_mgr() { return _result_mgr; } + ClientCache* client_cache() { return _client_cache; } + ClientCache* frontend_client_cache() { return _frontend_client_cache; } + ClientCache* broker_client_cache() { return _broker_client_cache; } + MemTracker* process_mem_tracker() { return _mem_tracker; } + PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; } + ThreadResourceMgr* thread_mgr() { return _thread_mgr; } + PriorityThreadPool* thread_pool() { return _thread_pool; } + ThreadPool* etl_thread_pool() { return _etl_thread_pool; } + CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; } + FragmentMgr* fragment_mgr() { return _fragment_mgr; } + TMasterInfo* master_info() { return _master_info; } + EtlJobMgr* etl_job_mgr() { return _etl_job_mgr; } + LoadPathMgr* load_path_mgr() { return _load_path_mgr; } + DiskIoMgr* disk_io_mgr() { return _disk_io_mgr; } + TmpFileMgr* tmp_file_mgr() { return _tmp_file_mgr; } + BfdParser* bfd_parser() const { return _bfd_parser; } + PullLoadTaskMgr* pull_load_task_mgr() const { return _pull_load_task_mgr; } + BrokerMgr* broker_mgr() const { return _broker_mgr; } + SnapshotLoader* snapshot_loader() const { return _snapshot_loader; } + BrpcStubCache* brpc_stub_cache() const { return _brpc_stub_cache; } + ReservationTracker* buffer_reservation() { return _buffer_reservation; } + BufferPool* buffer_pool() { return _buffer_pool; } + TabletWriterMgr* tablet_writer_mgr() { return _tablet_writer_mgr; } + LoadStreamMgr* load_stream_mgr() { return _load_stream_mgr; } + const std::vector& store_paths() const { return _store_paths; } + void set_store_paths(const std::vector& paths) { _store_paths = paths; } OLAPEngine* olap_engine() { return _olap_engine; } - void set_olap_engine(OLAPEngine* olap_engine) { _olap_engine = olap_engine; } private: - Status start_webserver(); + Status _init(const std::vector& store_paths); + void _destory(); + + Status _init_mem_tracker(); + /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity. + void _init_buffer_pool(int64_t min_page_len, + int64_t capacity, int64_t clean_pages_limit); + +private: std::vector _store_paths; // Leave protected so that subclasses can override - boost::scoped_ptr _stream_mgr; - boost::scoped_ptr _result_mgr; - boost::scoped_ptr _client_cache; - boost::scoped_ptr _frontend_client_cache; - std::unique_ptr_broker_client_cache; - boost::scoped_ptr _ev_http_server; - boost::scoped_ptr _web_page_handler; - boost::scoped_ptr _mem_tracker; - boost::scoped_ptr _pool_mem_trackers; - boost::scoped_ptr _thread_mgr; - boost::scoped_ptr _thread_pool; - boost::scoped_ptr _etl_thread_pool; - boost::scoped_ptr _cgroups_mgr; - boost::scoped_ptr _fragment_mgr; - boost::scoped_ptr _master_info; - boost::scoped_ptr _etl_job_mgr; - boost::scoped_ptr _load_path_mgr; - boost::scoped_ptr _disk_io_mgr; - boost::scoped_ptr _tmp_file_mgr; + MetricRegistry* _metrics = nullptr; + DataStreamMgr* _stream_mgr = nullptr; + ResultBufferMgr* _result_mgr = nullptr; + ClientCache* _client_cache = nullptr; + ClientCache* _frontend_client_cache = nullptr; + ClientCache* _broker_client_cache = nullptr; + MemTracker* _mem_tracker = nullptr; + PoolMemTrackerRegistry* _pool_mem_trackers = nullptr; + ThreadResourceMgr* _thread_mgr = nullptr; + PriorityThreadPool* _thread_pool = nullptr; + ThreadPool* _etl_thread_pool = nullptr; + CgroupsMgr* _cgroups_mgr = nullptr; + FragmentMgr* _fragment_mgr = nullptr; + TMasterInfo* _master_info = nullptr; + EtlJobMgr* _etl_job_mgr = nullptr; + LoadPathMgr* _load_path_mgr = nullptr; + DiskIoMgr* _disk_io_mgr = nullptr; + TmpFileMgr* _tmp_file_mgr = nullptr; - std::unique_ptr _bfd_parser; - std::unique_ptr _pull_load_task_mgr; - std::unique_ptr _broker_mgr; - std::unique_ptr _tablet_writer_mgr; - std::unique_ptr _load_stream_mgr; - std::unique_ptr _snapshot_loader; - std::unique_ptr _brpc_stub_cache; - bool _enable_webserver; + BfdParser* _bfd_parser = nullptr; + PullLoadTaskMgr* _pull_load_task_mgr = nullptr; + BrokerMgr* _broker_mgr = nullptr; + TabletWriterMgr* _tablet_writer_mgr = nullptr; + LoadStreamMgr* _load_stream_mgr = nullptr; + SnapshotLoader* _snapshot_loader = nullptr; + BrpcStubCache* _brpc_stub_cache = nullptr; - boost::scoped_ptr _buffer_reservation; - boost::scoped_ptr _buffer_pool; + ReservationTracker* _buffer_reservation = nullptr; + BufferPool* _buffer_pool = nullptr; OLAPEngine* _olap_engine = nullptr; - - ObjectPool _object_pool; -private: - static ExecEnv* _exec_env; - TimezoneDatabase _tz_database; - - /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity. - void init_buffer_pool(int64_t min_page_len, int64_t capacity, int64_t clean_pages_limit); }; } diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp new file mode 100644 index 0000000000..1584567e25 --- /dev/null +++ b/be/src/runtime/exec_env_init.cpp @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/exec_env.h" + +#include + +#include "common/config.h" +#include "common/logging.h" +#include "runtime/broker_mgr.h" +#include "runtime/bufferpool/buffer_pool.h" +#include "runtime/client_cache.h" +#include "runtime/data_stream_mgr.h" +#include "runtime/disk_io_mgr.h" +#include "runtime/result_buffer_mgr.h" +#include "runtime/mem_tracker.h" +#include "runtime/thread_resource_mgr.h" +#include "runtime/fragment_mgr.h" +#include "runtime/tablet_writer_mgr.h" +#include "runtime/tmp_file_mgr.h" +#include "runtime/bufferpool/reservation_tracker.h" +#include "util/metrics.h" +#include "util/network_util.h" +#include "util/parse_util.h" +#include "util/mem_info.h" +#include "util/debug_util.h" +#include "olap/olap_engine.h" +#include "util/network_util.h" +#include "util/bfd_parser.h" +#include "runtime/etl_job_mgr.h" +#include "runtime/load_path_mgr.h" +#include "runtime/load_stream_mgr.h" +#include "runtime/pull_load_task_mgr.h" +#include "runtime/snapshot_loader.h" +#include "util/pretty_printer.h" +#include "util/doris_metrics.h" +#include "util/brpc_stub_cache.h" +#include "util/priority_thread_pool.hpp" +#include "agent/cgroups_mgr.h" +#include "util/thread_pool.hpp" +#include "gen_cpp/BackendService.h" +#include "gen_cpp/FrontendService.h" +#include "gen_cpp/TPaloBrokerService.h" +#include "gen_cpp/HeartbeatService_types.h" + +namespace doris { + +Status ExecEnv::init(ExecEnv* env, const std::vector& store_paths) { + return env->_init(store_paths); +} + +Status ExecEnv::_init(const std::vector& store_paths) { + _store_paths = store_paths; + + _metrics = DorisMetrics::metrics(); + _stream_mgr = new DataStreamMgr(); + _result_mgr = new ResultBufferMgr(); + _client_cache = new BackendServiceClientCache(); + _frontend_client_cache = new FrontendServiceClientCache(); + _broker_client_cache = new BrokerServiceClientCache(); + _mem_tracker = nullptr; + _pool_mem_trackers = new PoolMemTrackerRegistry(); + _thread_mgr = new ThreadResourceMgr(); + _thread_pool = new PriorityThreadPool( + config::doris_scanner_thread_pool_thread_num, + config::doris_scanner_thread_pool_queue_size); + _etl_thread_pool = new ThreadPool( + config::etl_thread_pool_size, + config::etl_thread_pool_queue_size); + _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups); + _fragment_mgr = new FragmentMgr(this); + _master_info = new TMasterInfo(); + _etl_job_mgr = new EtlJobMgr(this); + _load_path_mgr = new LoadPathMgr(this); + _disk_io_mgr = new DiskIoMgr(); + _tmp_file_mgr = new TmpFileMgr(this), + _bfd_parser = BfdParser::create(); + _pull_load_task_mgr = new PullLoadTaskMgr(config::pull_load_task_dir); + _broker_mgr = new BrokerMgr(this); + _tablet_writer_mgr = new TabletWriterMgr(this); + _load_stream_mgr = new LoadStreamMgr(); + _snapshot_loader = new SnapshotLoader(this); + _brpc_stub_cache = new BrpcStubCache(); + + _client_cache->init_metrics(DorisMetrics::metrics(), "backend"); + _frontend_client_cache->init_metrics(DorisMetrics::metrics(), "frontend"); + _broker_client_cache->init_metrics(DorisMetrics::metrics(), "broker"); + _result_mgr->init(); + _cgroups_mgr->init_cgroups(); + _etl_job_mgr->init(); + Status status = _load_path_mgr->init(); + if (!status.ok()) { + LOG(ERROR) << "load path mgr init failed." << status.get_error_msg(); + exit(-1); + } + status = _pull_load_task_mgr->init(); + if (!status.ok()) { + LOG(ERROR) << "pull load task manager init failed." << status.get_error_msg(); + exit(-1); + } + _broker_mgr->init(); + return _init_mem_tracker(); +} + +Status ExecEnv::_init_mem_tracker() { + // Initialize global memory limit. + int64_t bytes_limit = 0; + bool is_percent = false; + std::stringstream ss; + // --mem_limit="" means no memory limit + bytes_limit = ParseUtil::parse_mem_spec(config::mem_limit, &is_percent); + if (bytes_limit < 0) { + ss << "Failed to parse mem limit from '" + config::mem_limit + "'."; + return Status(ss.str()); + } + + if (!BitUtil::IsPowerOf2(config::min_buffer_size)) { + ss << "--min_buffer_size must be a power-of-two: " << config::min_buffer_size; + return Status(ss.str()); + } + + int64_t buffer_pool_limit = ParseUtil::parse_mem_spec( + config::buffer_pool_limit, &is_percent); + if (buffer_pool_limit <= 0) { + ss << "Invalid --buffer_pool_limit value, must be a percentage or " + "positive bytes value or percentage: " << config::buffer_pool_limit; + return Status(ss.str()); + } + buffer_pool_limit = BitUtil::RoundDown(buffer_pool_limit, config::min_buffer_size); + + int64_t clean_pages_limit = ParseUtil::parse_mem_spec( + config::buffer_pool_clean_pages_limit, &is_percent); + if (clean_pages_limit <= 0) { + ss << "Invalid --buffer_pool_clean_pages_limit value, must be a percentage or " + "positive bytes value or percentage: " << config::buffer_pool_clean_pages_limit; + return Status(ss.str()); + } + + _init_buffer_pool(config::min_buffer_size, buffer_pool_limit, clean_pages_limit); + + // Limit of 0 means no memory limit. + if (bytes_limit > 0) { + _mem_tracker = new MemTracker(bytes_limit); + } + + if (bytes_limit > MemInfo::physical_mem()) { + LOG(WARNING) << "Memory limit " + << PrettyPrinter::print(bytes_limit, TUnit::BYTES) + << " exceeds physical memory of " + << PrettyPrinter::print(MemInfo::physical_mem(), + TUnit::BYTES); + } + + LOG(INFO) << "Using global memory limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES); + RETURN_IF_ERROR(_disk_io_mgr->init(_mem_tracker)); + RETURN_IF_ERROR(_tmp_file_mgr->init(DorisMetrics::metrics())); + return Status::OK; +} + +void ExecEnv::_init_buffer_pool(int64_t min_page_size, + int64_t capacity, + int64_t clean_pages_limit) { + DCHECK(_buffer_pool == nullptr); + _buffer_pool = new BufferPool(min_page_size, capacity, clean_pages_limit); + _buffer_reservation = new ReservationTracker(); + _buffer_reservation->InitRootTracker(nullptr, capacity); +} + +void ExecEnv::_destory() { + delete _brpc_stub_cache; + delete _snapshot_loader; + delete _load_stream_mgr; + delete _tablet_writer_mgr; + delete _broker_mgr; + delete _pull_load_task_mgr; + delete _bfd_parser; + delete _tmp_file_mgr; + delete _disk_io_mgr; + delete _load_path_mgr; + delete _etl_job_mgr; + delete _master_info; + delete _fragment_mgr; + delete _cgroups_mgr; + delete _etl_thread_pool; + delete _thread_pool; + delete _thread_mgr; + delete _pool_mem_trackers; + delete _mem_tracker; + delete _broker_client_cache; + delete _frontend_client_cache; + delete _client_cache; + delete _result_mgr; + delete _stream_mgr; + _metrics = nullptr; +} + +void ExecEnv::destroy(ExecEnv* env) { + env->_destory(); +} + +} diff --git a/be/src/runtime/export_sink.cpp b/be/src/runtime/export_sink.cpp index 146ea1f73f..c93fb02637 100644 --- a/be/src/runtime/export_sink.cpp +++ b/be/src/runtime/export_sink.cpp @@ -25,7 +25,6 @@ #include "runtime/tuple_row.h" #include "runtime/row_batch.h" #include "util/runtime_profile.h" -#include "util/debug_util.h" #include "util/types.h" #include "exec/local_file_writer.h" #include "exec/broker_writer.h" @@ -89,7 +88,7 @@ Status ExportSink::open(RuntimeState* state) { } Status ExportSink::send(RuntimeState* state, RowBatch* batch) { - VLOG_ROW << "debug: export_sink send batch: " << print_batch(batch); + VLOG_ROW << "debug: export_sink send batch: " << batch->to_string(); SCOPED_TIMER(_profile->total_time_counter()); int num_rows = batch->num_rows(); // we send at most 1024 rows at a time diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 9358c18b7a..42efed7f7b 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -33,6 +33,7 @@ #include "util/debug_util.h" #include "util/doris_metrics.h" #include "util/thrift_util.h" +#include "runtime/client_cache.h" #include "gen_cpp/PaloInternalService_types.h" #include "gen_cpp/Types_types.h" #include "gen_cpp/DataSinks_types.h" diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index fb753a96db..8b9a6e72de 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -126,7 +126,7 @@ bool MemPool::FindChunk(size_t min_size, bool check_limits) { size_t chunk_size = 0; DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE); - if (config::FLAGS_disable_mem_pools) { + if (config::disable_mem_pools) { // Disable pooling by sizing the chunk to fit only this allocation. // Make sure the alignment guarantees are respected. chunk_size = std::max(min_size, alignof(max_align_t)); @@ -252,7 +252,7 @@ bool MemPool::CheckIntegrity(bool check_current_chunk_empty) { DCHECK_LT(current_chunk_idx_, static_cast(chunks_.size())); // Without pooling, there are way too many chunks and this takes too long. - if (config::FLAGS_disable_mem_pools) return true; + if (config::disable_mem_pools) return true; // check that current_chunk_idx_ points to the last chunk with allocated data int64_t total_allocated = 0; diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 8d6db5b516..23c328391c 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -113,8 +113,7 @@ class MemPool { /// should be a power-of-two in [1, alignof(std::max_align_t)]. uint8_t* try_allocate_aligned(int64_t size, int alignment) { DCHECK_GE(alignment, 1); - DCHECK_LE(alignment, config::FLAGS_MEMORY_MAX_ALIGNMENT); - //DCHECK_LE(alignment, config::FLAGS_MEMORY_MAX_ALIGNMENT); + DCHECK_LE(alignment, config::memory_max_alignment); DCHECK_EQ(BitUtil::RoundUpToPowerOfTwo(alignment), alignment); return allocate(size, alignment); } diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 2fb3360052..401b825460 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -312,7 +312,7 @@ Status PlanFragmentExecutor::open_internal() { for (int i = 0; i < batch->num_rows(); ++i) { TupleRow* row = batch->get_row(i); - VLOG_ROW << print_row(row, row_desc()); + VLOG_ROW << row->to_string(row_desc()); } } diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp index 7e431e4f9f..d65e7a575e 100644 --- a/be/src/runtime/row_batch.cpp +++ b/be/src/runtime/row_batch.cpp @@ -665,4 +665,13 @@ void RowBatch::add_buffer(BufferPool::ClientHandle* client, _buffers.push_back(std::move(buffer_info)); if (flush == FlushMode::FLUSH_RESOURCES) mark_flush_resources(); } + +std::string RowBatch::to_string() { + std::stringstream out; + for (int i = 0; i < _num_rows; ++i) { + out << get_row(i)->to_string(_row_desc) << "\n"; + } + return out.str(); +} + } // end namespace doris diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h index 2ab9a8c2ed..7a71b6b7fc 100644 --- a/be/src/runtime/row_batch.h +++ b/be/src/runtime/row_batch.h @@ -427,6 +427,7 @@ public: int max_tuple_buffer_size(); static const int MAX_MEM_POOL_SIZE = 32 * 1024 * 1024; + std::string to_string(); private: MemTracker* _mem_tracker; // not owned diff --git a/be/src/runtime/test_env.cc b/be/src/runtime/test_env.cc index 4c69edc2b9..42edc97c8c 100644 --- a/be/src/runtime/test_env.cc +++ b/be/src/runtime/test_env.cc @@ -31,7 +31,7 @@ TestEnv::TestEnv() { // DorisMetrics::create_metrics(_s_static_metrics.get()); } _exec_env.reset(new ExecEnv()); - _exec_env->init_for_tests(); + // _exec_env->init_for_tests(); _io_mgr_tracker.reset(new MemTracker(-1)); _block_mgr_parent_tracker.reset(new MemTracker(-1)); _exec_env->disk_io_mgr()->init(_io_mgr_tracker.get()); diff --git a/be/src/runtime/tuple.cpp b/be/src/runtime/tuple.cpp index b9c46893e0..0a3fa57456 100644 --- a/be/src/runtime/tuple.cpp +++ b/be/src/runtime/tuple.cpp @@ -26,7 +26,6 @@ #include "runtime/raw_value.h" #include "runtime/tuple_row.h" #include "runtime/string_value.h" -#include "util/debug_util.h" namespace doris { @@ -199,4 +198,44 @@ template void Tuple::materialize_exprs(TupleRow* row, const TupleDescript template void Tuple::materialize_exprs(TupleRow* row, const TupleDescriptor& desc, const std::vector& materialize_expr_ctxs, MemPool* pool, std::vector* non_null_var_values, int* total_var_len); + +std::string Tuple::to_string(const TupleDescriptor& d) const { + std::stringstream out; + out << "("; + + bool first_value = true; + for (auto slot : d.slots()) { + if (!slot->is_materialized()) { + continue; + } + if (first_value) { + first_value = false; + } else { + out << " "; + } + + if (is_null(slot->null_indicator_offset())) { + out << "null"; + } else { + std::string value_str; + RawValue::print_value( + get_slot(slot->tuple_offset()), + slot->type(), + -1, + &value_str); + out << value_str; + } + } + + out << ")"; + return out.str(); +} + +std::string Tuple::to_string(const Tuple* t, const TupleDescriptor& d) { + if (t == nullptr) { + return "null"; + } + return t->to_string(d); +} + } diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h index ab930a50c7..7bdda8e8a2 100644 --- a/be/src/runtime/tuple.h +++ b/be/src/runtime/tuple.h @@ -173,6 +173,9 @@ public: static const char* _s_llvm_class_name; void* get_data() { return this; } + + std::string to_string(const TupleDescriptor& d) const; + static std::string to_string(const Tuple* t, const TupleDescriptor& d); private: void* _data; }; diff --git a/be/src/runtime/tuple_row.cpp b/be/src/runtime/tuple_row.cpp index e422506f15..41ceb66199 100644 --- a/be/src/runtime/tuple_row.cpp +++ b/be/src/runtime/tuple_row.cpp @@ -17,7 +17,24 @@ #include "runtime/tuple_row.h" +#include + namespace doris { const char* TupleRow::_s_llvm_class_name = "class.doris::TupleRow"; + +std::string TupleRow::to_string(const RowDescriptor& d) { + std::stringstream out; + out << "["; + for (int i = 0; i < d.tuple_descriptors().size(); ++i) { + if (i != 0) { + out << " "; + } + out << Tuple::to_string(get_tuple(i), *d.tuple_descriptors()[i]); + } + + out << "]"; + return out.str(); +} + } diff --git a/be/src/runtime/tuple_row.h b/be/src/runtime/tuple_row.h index bbfe0a9068..6416b9ae40 100644 --- a/be/src/runtime/tuple_row.h +++ b/be/src/runtime/tuple_row.h @@ -115,6 +115,7 @@ public: // For C++/IR interop, we need to be able to look up types by name. static const char* _s_llvm_class_name; + std::string to_string(const RowDescriptor& d); private: Tuple* _tuples[1]; }; diff --git a/be/src/service/CMakeLists.txt b/be/src/service/CMakeLists.txt index e42dd67dbd..63f79b607d 100644 --- a/be/src/service/CMakeLists.txt +++ b/be/src/service/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(Service backend_options.cpp backend_service.cpp brpc_service.cpp + http_service.cpp internal_service.cpp ) diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 5e5c56f771..f01ce03cc2 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -49,6 +49,7 @@ #include "service/backend_options.h" #include "service/backend_service.h" #include "service/brpc_service.h" +#include "service/http_service.h" #include #include "common/resource_tls.h" #include "exec/schema_scanner/frontend_helper.h" @@ -142,14 +143,15 @@ int main(int argc, char** argv) { } // start backend service for the coordinator on be_port - doris::ExecEnv exec_env(paths); - exec_env.set_olap_engine(engine); + auto exec_env = doris::ExecEnv::GetInstance(); + doris::ExecEnv::init(exec_env, paths); + exec_env->set_olap_engine(engine); - doris::FrontendHelper::setup(&exec_env); + doris::FrontendHelper::setup(exec_env); doris::ThriftServer* be_server = nullptr; EXIT_IF_ERROR(doris::BackendService::create_service( - &exec_env, + exec_env, doris::config::be_port, &be_server)); Status status = be_server->start(); @@ -159,7 +161,7 @@ int main(int argc, char** argv) { exit(1); } - doris::BRpcService brpc_service(&exec_env); + doris::BRpcService brpc_service(exec_env); status = brpc_service.start(doris::config::brpc_port); if (!status.ok()) { LOG(ERROR) << "BRPC service did not start correctly, exiting"; @@ -167,18 +169,20 @@ int main(int argc, char** argv) { exit(1); } - status = exec_env.start_services(); + doris::HttpService http_service( + exec_env, doris::config::webserver_port, doris::config::webserver_num_workers); + status = http_service.start(); if (!status.ok()) { - LOG(ERROR) << "Doris Be services did not start correctly, exiting"; + LOG(ERROR) << "Doris Be http service did not start correctly, exiting"; doris::shutdown_logging(); exit(1); } - doris::TMasterInfo* master_info = exec_env.master_info(); + doris::TMasterInfo* master_info = exec_env->master_info(); // start heart beat server doris::ThriftServer* heartbeat_thrift_server; doris::AgentStatus heartbeat_status = doris::create_heartbeat_server( - &exec_env, + exec_env, doris::config::heartbeat_service_port, &heartbeat_thrift_server, doris::config::heartbeat_service_thread_count, diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp new file mode 100644 index 0000000000..ece66e6d9c --- /dev/null +++ b/be/src/service/http_service.cpp @@ -0,0 +1,119 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "service/http_service.h" + +#include "http/action/checksum_action.h" +#include "http/action/health_action.h" +#include "http/action/meta_action.h" +#include "http/action/metrics_action.h" +#include "http/action/mini_load.h" +#include "http/action/pprof_actions.h" +#include "http/action/reload_tablet_action.h" +#include "http/action/restore_tablet_action.h" +#include "http/action/snapshot_action.h" +#include "http/action/stream_load.h" +#include "http/default_path_handlers.h" +#include "http/download_action.h" +#include "http/ev_http_server.h" +#include "http/http_method.h" +#include "http/monitor_action.h" +#include "http/web_page_handler.h" +#include "runtime/exec_env.h" +#include "runtime/load_path_mgr.h" +#include "util/doris_metrics.h" + +namespace doris { + +HttpService::HttpService(ExecEnv* env, int port, int num_threads) + : _env(env), + _ev_http_server(new EvHttpServer(port, num_threads)), + _web_page_handler(new WebPageHandler(_ev_http_server.get())) { +} + +HttpService::~HttpService() { +} + +Status HttpService::start() { + add_default_path_handlers(_web_page_handler.get(), _env->process_mem_tracker()); + + // register load + _ev_http_server->register_handler( + HttpMethod::PUT, "/api/{db}/{table}/_load", new MiniLoadAction(_env)); + _ev_http_server->register_handler( + HttpMethod::PUT, "/api/{db}/{table}/_stream_load", new StreamLoadAction(_env)); + + // register download action + std::vector allow_paths; + for (auto& path : _env->store_paths()) { + allow_paths.emplace_back(path.path); + } + DownloadAction* download_action = new DownloadAction(_env, allow_paths); + _ev_http_server->register_handler(HttpMethod::HEAD, "/api/_download_load", download_action); + _ev_http_server->register_handler(HttpMethod::GET, "/api/_download_load", download_action); + + DownloadAction* tablet_download_action = new DownloadAction(_env, allow_paths); + _ev_http_server->register_handler( + HttpMethod::HEAD, "/api/_tablet/_download", tablet_download_action); + _ev_http_server->register_handler( + HttpMethod::GET, "/api/_tablet/_download", tablet_download_action); + + DownloadAction* error_log_download_action = new DownloadAction( + _env, _env->load_path_mgr()->get_load_error_file_dir()); + _ev_http_server->register_handler( + HttpMethod::GET, "/api/_load_error_log", error_log_download_action); + _ev_http_server->register_handler( + HttpMethod::HEAD, "/api/_load_error_log", error_log_download_action); + + // Register BE health action + HealthAction* health_action = new HealthAction(_env); + _ev_http_server->register_handler(HttpMethod::GET, "/api/health", health_action); + + // register pprof actions + PprofActions::setup(_env, _ev_http_server.get()); + + // register metrics + { + auto action = new MetricsAction(DorisMetrics::metrics()); + _ev_http_server->register_handler(HttpMethod::GET, "/metrics", action); + } + + MetaAction* meta_action = new MetaAction(HEADER); + _ev_http_server->register_handler(HttpMethod::GET, "/api/meta/header/{tablet_id}/{schema_hash}", meta_action); + +#ifndef BE_TEST + // Register BE checksum action + ChecksumAction* checksum_action = new ChecksumAction(_env); + _ev_http_server->register_handler(HttpMethod::GET, "/api/checksum", checksum_action); + + // Register BE reload tablet action + ReloadTabletAction* reload_tablet_action = new ReloadTabletAction(_env); + _ev_http_server->register_handler(HttpMethod::GET, "/api/reload_tablet", reload_tablet_action); + + RestoreTabletAction* restore_tablet_action = new RestoreTabletAction(_env); + _ev_http_server->register_handler(HttpMethod::POST, "/api/restore_tablet", restore_tablet_action); + + // Register BE snapshot action + SnapshotAction* snapshot_action = new SnapshotAction(_env); + _ev_http_server->register_handler(HttpMethod::GET, "/api/snapshot", snapshot_action); +#endif + + RETURN_IF_ERROR(_ev_http_server->start()); + return Status::OK; +} + +} diff --git a/be/src/service/http_service.h b/be/src/service/http_service.h new file mode 100644 index 0000000000..f79208b027 --- /dev/null +++ b/be/src/service/http_service.h @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" + +namespace doris { + +class ExecEnv; +class EvHttpServer; +class WebPageHandler; + +// HTTP service for Doris BE +class HttpService { +public: + HttpService(ExecEnv* env, int port, int num_threads); + ~HttpService(); + + Status start(); +private: + ExecEnv* _env; + + std::unique_ptr _ev_http_server; + std::unique_ptr _web_page_handler; +}; + +} diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index 13d8670522..6178740c58 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -50,6 +50,7 @@ add_library(Util STATIC thrift_util.cpp thrift_client.cpp thrift_server.cpp + stack_util.cpp symbols_util.cpp system_metrics.cpp url_parser.cpp diff --git a/be/src/util/debug_util.cpp b/be/src/util/debug_util.cpp index 1c20d8b054..54c92d6d2d 100644 --- a/be/src/util/debug_util.cpp +++ b/be/src/util/debug_util.cpp @@ -24,10 +24,6 @@ #include "common/logging.h" #include "gen_cpp/version.h" -#include "runtime/descriptors.h" -#include "runtime/raw_value.h" -#include "runtime/tuple_row.h" -#include "runtime/row_batch.h" #include "util/cpu_info.h" #include "gen_cpp/Opcodes_types.h" #include "gen_cpp/types.pb.h" @@ -45,12 +41,6 @@ #define MILLION (THOUSAND * 1000) #define BILLION (MILLION * 1000) -namespace google { -namespace glog_internal_namespace_ { -void DumpStackTraceToString(std::string* stacktrace); -} -} - namespace doris { #define THRIFT_ENUM_OUTPUT_FN_IMPL(E, MAP) \ @@ -134,70 +124,6 @@ std::string print_plan_node_type(const TPlanNodeType::type& type) { return "Invalid plan node type"; } -std::string print_tuple(const Tuple* t, const TupleDescriptor& d) { - if (t == NULL) { - return "null"; - } - - std::stringstream out; - out << "("; - bool first_value = true; - - for (int i = 0; i < d.slots().size(); ++i) { - SlotDescriptor* slot_d = d.slots()[i]; - - if (!slot_d->is_materialized()) { - continue; - } - - if (first_value) { - first_value = false; - } else { - out << " "; - } - - if (t->is_null(slot_d->null_indicator_offset())) { - out << "null"; - } else { - std::string value_str; - RawValue::print_value( - t->get_slot(slot_d->tuple_offset()), - slot_d->type(), - -1, - &value_str); - out << value_str; - } - } - - out << ")"; - return out.str(); -} - -std::string print_row(TupleRow* row, const RowDescriptor& d) { - std::stringstream out; - out << "["; - - for (int i = 0; i < d.tuple_descriptors().size(); ++i) { - if (i != 0) { - out << " "; - } - out << print_tuple(row->get_tuple(i), *d.tuple_descriptors()[i]); - } - - out << "]"; - return out.str(); -} - -std::string print_batch(RowBatch* batch) { - std::stringstream out; - - for (int i = 0; i < batch->num_rows(); ++i) { - out << print_row(batch->get_row(i), batch->row_desc()) << "\n"; - } - - return out.str(); -} - std::string get_build_version(bool compact) { std::stringstream ss; ss << PALO_BUILD_VERSION @@ -222,12 +148,6 @@ std::string get_version_string(bool compact) { return ss.str(); } -std::string get_stack_trace() { - std::string s; - google::glog_internal_namespace_::DumpStackTraceToString(&s); - return s; -} - std::string hexdump(const char* buf, int len) { std::stringstream ss; ss << std::hex << std::uppercase; diff --git a/be/src/util/debug_util.h b/be/src/util/debug_util.h index d61c9e8404..a9ba08cc79 100644 --- a/be/src/util/debug_util.h +++ b/be/src/util/debug_util.h @@ -32,16 +32,8 @@ namespace doris { -class RowDescriptor; -class TupleDescriptor; -class Tuple; -class TupleRow; -class RowBatch; class PUniqueId; -std::string print_tuple(const Tuple* t, const TupleDescriptor& d); -std::string print_row(TupleRow* row, const RowDescriptor& d); -std::string print_batch(RowBatch* batch); std::string print_id(const TUniqueId& id); std::string print_id(const PUniqueId& id); std::string print_plan_node_type(const TPlanNodeType::type& type); @@ -63,11 +55,6 @@ std::string get_build_version(bool compact); // Returns " version " std::string get_version_string(bool compact); -// Returns the stack trace as a string from the current location. -// Note: there is a libc bug that causes this not to work on 64 bit machines -// for recursive calls. -std::string get_stack_trace(); - std::string hexdump(const char* buf, int len); } diff --git a/be/src/util/disk_info.cpp b/be/src/util/disk_info.cpp index 420fdd487a..b2ac7dec2e 100644 --- a/be/src/util/disk_info.cpp +++ b/be/src/util/disk_info.cpp @@ -28,8 +28,6 @@ #include #include -#include "util/debug_util.h" - namespace doris { bool DiskInfo::_s_initialized; diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index bebe630c3a..2950e3c4f0 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -27,7 +27,6 @@ #include #include -#include "util/debug_util.h" #include "util/pretty_printer.h" #include "util/string_parser.hpp" diff --git a/be/src/util/stack_util.cpp b/be/src/util/stack_util.cpp new file mode 100644 index 0000000000..862c34ce50 --- /dev/null +++ b/be/src/util/stack_util.cpp @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/stack_util.h" + +namespace google { +namespace glog_internal_namespace_ { +void DumpStackTraceToString(std::string* stacktrace); +} +} + +namespace doris { + +std::string get_stack_trace() { + std::string s; + google::glog_internal_namespace_::DumpStackTraceToString(&s); + return s; +} + +} diff --git a/be/src/util/stack_util.h b/be/src/util/stack_util.h new file mode 100644 index 0000000000..082ab905d2 --- /dev/null +++ b/be/src/util/stack_util.h @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +namespace doris { + +// Returns the stack trace as a string from the current location. +// Note: there is a libc bug that causes this not to work on 64 bit machines +// for recursive calls. +std::string get_stack_trace(); + +} diff --git a/be/test/exec/olap_table_sink_test.cpp b/be/test/exec/olap_table_sink_test.cpp index 3c506b2d9e..8f4bbab9c5 100644 --- a/be/test/exec/olap_table_sink_test.cpp +++ b/be/test/exec/olap_table_sink_test.cpp @@ -19,16 +19,19 @@ #include +#include "gen_cpp/HeartbeatService_types.h" #include "gen_cpp/internal_service.pb.h" -#include "runtime/exec_env.h" -#include "runtime/row_batch.h" -#include "runtime/tuple_row.h" -#include "runtime/runtime_state.h" #include "runtime/decimal_value.h" +#include "runtime/exec_env.h" +#include "runtime/load_stream_mgr.h" +#include "runtime/row_batch.h" +#include "runtime/runtime_state.h" +#include "runtime/thread_resource_mgr.h" +#include "runtime/tuple_row.h" #include "service/brpc.h" -#include "util/descriptor_helper.h" +#include "util/brpc_stub_cache.h" #include "util/cpu_info.h" -#include "util/debug_util.h" +#include "util/descriptor_helper.h" namespace doris { namespace stream_load { @@ -41,8 +44,24 @@ public: virtual ~OlapTableSinkTest() { } void SetUp() override { k_add_batch_status = Status::OK; + + _env._thread_mgr = new ThreadResourceMgr(); + _env._master_info = new TMasterInfo(); + _env._load_stream_mgr = new LoadStreamMgr(); + _env._brpc_stub_cache = new BrpcStubCache(); + } + void TearDown() override { + delete _env._brpc_stub_cache; + _env._brpc_stub_cache = nullptr; + delete _env._load_stream_mgr; + _env._load_stream_mgr = nullptr; + delete _env._master_info; + _env._master_info = nullptr; + delete _env._thread_mgr; + _env._thread_mgr = nullptr; } private: + ExecEnv _env; }; TDataSink get_data_sink(TDescriptorTable* desc_tbl) { @@ -281,8 +300,8 @@ public: MemTracker tracker; RowBatch batch(*_row_desc, request->row_batch(), &tracker); for (int i = 0; i < batch.num_rows(); ++i){ - LOG(INFO) << print_row(batch.get_row(i), *_row_desc); - _output_set->emplace(print_row(batch.get_row(i), *_row_desc)); + LOG(INFO) << batch.get_row(i)->to_string(*_row_desc); + _output_set->emplace(batch.get_row(i)->to_string(*_row_desc)); } } } @@ -310,11 +329,10 @@ TEST_F(OlapTableSinkTest, normal) { brpc::ServerOptions options; server->Start(4356, &options); - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -417,11 +435,10 @@ TEST_F(OlapTableSinkTest, convert) { brpc::ServerOptions options; server->Start(4356, &options); - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1024; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -545,11 +562,10 @@ TEST_F(OlapTableSinkTest, convert) { } TEST_F(OlapTableSinkTest, init_fail1) { - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -604,11 +620,10 @@ TEST_F(OlapTableSinkTest, init_fail1) { } TEST_F(OlapTableSinkTest, init_fail3) { - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -664,11 +679,10 @@ TEST_F(OlapTableSinkTest, init_fail3) { } TEST_F(OlapTableSinkTest, init_fail4) { - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -732,11 +746,10 @@ TEST_F(OlapTableSinkTest, add_batch_failed) { brpc::ServerOptions options; server->Start(4356, &options); - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; @@ -828,11 +841,10 @@ TEST_F(OlapTableSinkTest, decimal) { brpc::ServerOptions options; server->Start(4356, &options); - ExecEnv env; TUniqueId fragment_id; TQueryOptions query_options; query_options.batch_size = 1; - RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &env); + RuntimeState state(fragment_id, query_options, "2018-05-25 12:14:15", &_env); state._instance_mem_tracker.reset(new MemTracker()); ObjectPool obj_pool; diff --git a/be/test/http/stream_load_test.cpp b/be/test/http/stream_load_test.cpp index 615e10ad39..e5d0431394 100644 --- a/be/test/http/stream_load_test.cpp +++ b/be/test/http/stream_load_test.cpp @@ -24,9 +24,14 @@ #include #include "exec/schema_scanner/frontend_helper.h" +#include "gen_cpp/HeartbeatService_types.h" #include "http/http_channel.h" #include "http/http_request.h" #include "runtime/exec_env.h" +#include "runtime/load_stream_mgr.h" +#include "runtime/thread_resource_mgr.h" +#include "util/brpc_stub_cache.h" +#include "util/cpu_info.h" #include "util/doris_metrics.h" class mg_connection; @@ -71,14 +76,29 @@ public: k_stream_load_plan_status = Status::OK; k_response_str = ""; config::streaming_load_max_mb = 1; + + _env._thread_mgr = new ThreadResourceMgr(); + _env._master_info = new TMasterInfo(); + _env._load_stream_mgr = new LoadStreamMgr(); + _env._brpc_stub_cache = new BrpcStubCache(); + } + void TearDown() override { + delete _env._brpc_stub_cache; + _env._brpc_stub_cache = nullptr; + delete _env._load_stream_mgr; + _env._load_stream_mgr = nullptr; + delete _env._master_info; + _env._master_info = nullptr; + delete _env._thread_mgr; + _env._thread_mgr = nullptr; } private: + ExecEnv _env; }; TEST_F(StreamLoadActionTest, no_auth) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; action.on_header(&request); @@ -92,8 +112,7 @@ TEST_F(StreamLoadActionTest, no_auth) { #if 0 TEST_F(StreamLoadActionTest, no_content_length) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&__env); HttpRequest request; request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); @@ -107,8 +126,7 @@ TEST_F(StreamLoadActionTest, no_content_length) { TEST_F(StreamLoadActionTest, unknown_encoding) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); @@ -124,8 +142,7 @@ TEST_F(StreamLoadActionTest, unknown_encoding) { TEST_F(StreamLoadActionTest, normal) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; @@ -145,8 +162,7 @@ TEST_F(StreamLoadActionTest, normal) { TEST_F(StreamLoadActionTest, put_fail) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; @@ -168,8 +184,7 @@ TEST_F(StreamLoadActionTest, put_fail) { TEST_F(StreamLoadActionTest, commit_fail) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; struct evhttp_request ev_req; @@ -189,8 +204,7 @@ TEST_F(StreamLoadActionTest, commit_fail) { TEST_F(StreamLoadActionTest, begin_fail) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; struct evhttp_request ev_req; @@ -211,8 +225,7 @@ TEST_F(StreamLoadActionTest, begin_fail) { #if 0 TEST_F(StreamLoadActionTest, receive_failed) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; request._headers.emplace(HttpHeaders::AUTHORIZATION, "Basic cm9vdDo="); @@ -228,8 +241,7 @@ TEST_F(StreamLoadActionTest, receive_failed) { TEST_F(StreamLoadActionTest, plan_fail) { DorisMetrics::instance()->initialize("StreamLoadActionTest"); - ExecEnv env; - StreamLoadAction action(&env); + StreamLoadAction action(&_env); HttpRequest request; struct evhttp_request ev_req; diff --git a/env.sh b/env.sh index 9d424cff71..543054552a 100755 --- a/env.sh +++ b/env.sh @@ -24,7 +24,7 @@ fi # include custom environment variables if [[ -f ${DORIS_HOME}/custom_env.sh ]]; then - source ${DORIS_HOME}/custom_env.sh + . ${DORIS_HOME}/custom_env.sh fi # set DORIS_THIRDPARTY diff --git a/thirdparty/build-thirdparty.sh b/thirdparty/build-thirdparty.sh index de38bf3108..df744b0b19 100755 --- a/thirdparty/build-thirdparty.sh +++ b/thirdparty/build-thirdparty.sh @@ -31,29 +31,34 @@ set -e curdir=`dirname "$0"` curdir=`cd "$curdir"; pwd` -if [ ! -f $curdir/vars.sh ]; then - echo "vars.sh is missing". - exit 1 -fi - -export DORIS_HOME=$curdir/../ -export GCC_HOME=$curdir/../palo-toolchain/gcc730 +export DORIS_HOME=$curdir/.. export TP_DIR=$curdir -source $curdir/vars.sh -cd $TP_DIR +# include custom environment variables +if [[ -f ${DORIS_HOME}/custom_env.sh ]]; then + . ${DORIS_HOME}/custom_env.sh +fi -if [ ! -f $TP_DIR/download-thirdparty.sh ]; then +if [[ ! -f ${TP_DIR}/download-thirdparty.sh ]]; then echo "Download thirdparty script is missing". exit 1 fi -mkdir -p $TP_DIR/src -mkdir -p $TP_DIR/installed +if [ ! -f ${TP_DIR}/vars.sh ]; then + echo "vars.sh is missing". + exit 1 +fi +. ${TP_DIR}/vars.sh + +cd $TP_DIR + +# Download thirdparties. +${TP_DIR}/download-thirdparty.sh + export LD_LIBRARY_PATH=$TP_DIR/installed/lib:$LD_LIBRARY_PATH -if [ -f $DORIS_HOME/palo-toolchain/gcc730/bin/gcc ]; then - GCC_HOME=$curdir/../palo-toolchain/gcc730 +if [ -f ${DORIS_TOOLCHAIN}/gcc730/bin/gcc ]; then + GCC_HOME=${DORIS_TOOLCHAIN}/gcc730 export CC=${GCC_HOME}/bin/gcc export CPP=${GCC_HOME}/bin/cpp export CXX=${GCC_HOME}/bin/g++ @@ -63,9 +68,6 @@ else export CXX=g++ fi -# Download thirdparties. -$TP_DIR/download-thirdparty.sh $@ - check_prerequest() { local CMD=$1 local NAME=$2 @@ -241,7 +243,6 @@ build_protobuf() { ./configure --prefix=${TP_INSTALL_DIR} --disable-shared --enable-static --with-zlib=${TP_INSTALL_DIR}/include cd src sed -i 's/^AM_LDFLAGS\(.*\)$/AM_LDFLAGS\1 -all-static/' Makefile - make -j$PARALLEL cd - make -j$PARALLEL && make install } @@ -353,6 +354,7 @@ build_bzip() { check_if_source_exist $BZIP_SOURCE cd $TP_SOURCE_DIR/$BZIP_SOURCE + CFLAGS="-fPIC" make -j$PARALLEL install PREFIX=$TP_INSTALL_DIR } @@ -489,7 +491,7 @@ build_rocksdb() { cd $TP_SOURCE_DIR/$ROCKSDB_SOURCE CFLAGS="-I ${TP_INCLUDE_DIR} -I ${TP_INCLUDE_DIR}/snappy -I ${TP_INCLUDE_DIR}/lz4" CXXFLAGS="-fPIC" LDFLAGS="-static-libstdc++ -static-libgcc" \ - make -j$PARALLEL static_lib + make USE_RTTI=1 -j$PARALLEL static_lib cp librocksdb.a ../../installed/lib/librocksdb.a cp -r include/rocksdb ../../installed/include/ } diff --git a/thirdparty/download-thirdparty.sh b/thirdparty/download-thirdparty.sh index dff1a89a51..772dfb48c7 100755 --- a/thirdparty/download-thirdparty.sh +++ b/thirdparty/download-thirdparty.sh @@ -26,13 +26,28 @@ set -e curdir=`dirname "$0"` curdir=`cd "$curdir"; pwd` -REPOSITORY_URL=$1 -export DORIS_HOME=$curdir/../ -source $curdir/vars.sh +if [[ -z "${DORIS_HOME}" ]]; then + DORIS_HOME=$curdir/.. +fi -mkdir -p $TP_DIR/src -mkdir -p $TP_DIR/installed +# include custom environment variables +if [[ -f ${DORIS_HOME}/custom_env.sh ]]; then + . ${DORIS_HOME}/custom_env.sh +fi + +if [[ -z "${TP_DIR}" ]]; then + TP_DIR=$curdir +fi + +if [ ! -f ${TP_DIR}/vars.sh ]; then + echo "vars.sh is missing". + exit 1 +fi +. ${TP_DIR}/vars.sh + +mkdir -p ${TP_DIR}/src +mkdir -p ${TP_DIR}/installed download() { local FILENAME=$1 @@ -230,5 +245,5 @@ if [ ! -f $PATCHED_MARK ]; then touch $PATCHED_MARK fi cd - -echo "Finished patching $LZ4_SOURCE" +echo "Finished patching $BRPC_SOURCE"